Skip to content

Commit

Permalink
isomorphic-wrtc: initial
Browse files Browse the repository at this point in the history
  • Loading branch information
tharvik committed Feb 12, 2024
1 parent 8844918 commit 993f8de
Show file tree
Hide file tree
Showing 13 changed files with 4,337 additions and 727 deletions.
2 changes: 1 addition & 1 deletion discojs/discojs-core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
},
"homepage": "https://github.com/epfml/disco#readme",
"dependencies": {
"@roamhq/wrtc": "^0.7.1",
"@tensorflow/tfjs": "4",
"@types/msgpack-lite": "0.1",
"axios": "0.27",
"gpt3-tokenizer": "1",
"immutable": "4",
"isomorphic-wrtc": "*",
"isomorphic-ws": "4",
"msgpack-lite": "0.1",
"simple-peer": "9",
Expand Down
12 changes: 5 additions & 7 deletions discojs/discojs-core/src/client/decentralized/base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ export class Base extends Client {
/**
* The pool of peers to communicate with during the current training round.
*/
private pool?: Promise<PeerPool>
private pool?: PeerPool
private connections?: Map<NodeID, PeerConnection>

/**
Expand Down Expand Up @@ -55,8 +55,7 @@ export class Base extends Client {
throw new Error('waiting for peers but peer pool is undefined')
}

const pool = await this.pool
const connections = await pool.getPeers(
const connections = await this.pool.getPeers(
peers,
this.server,
// Init receipt of peers weights
Expand Down Expand Up @@ -90,7 +89,7 @@ export class Base extends Client {
if (this.pool === undefined) {
throw new Error('received signal but peer pool is undefined')
}
void this.pool.then((pool) => pool.signal(event.peer, event.signal))
this.pool.signal(event.peer, event.signal)
})

return server
Expand Down Expand Up @@ -125,13 +124,12 @@ export class Base extends Client {
throw new Error('received id from server but was already received')
}
this._ownId = peerIdMsg.id
this.pool = PeerPool.init(peerIdMsg.id)
this.pool = new PeerPool(peerIdMsg.id)
}

async disconnect (): Promise<void> {
// Disconnect from peers
const pool = await this.pool
pool?.shutdown()
this.pool?.shutdown()
this.pool = undefined

if (this.connections !== undefined) {
Expand Down
5 changes: 2 additions & 3 deletions discojs/discojs-core/src/client/decentralized/peer.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { assert } from 'chai'
import { List, Range, Set } from 'immutable'
import wrtc from '@roamhq/wrtc'

import { Peer } from './peer'

Expand All @@ -10,8 +9,8 @@ describe.skip('peer', function () {
let peer2: Peer

beforeEach(async () => {
peer1 = new Peer('1', { wrtc })
peer2 = new Peer('2', { wrtc, initiator: true })
peer1 = new Peer('1')
peer2 = new Peer('2', true)
const peers = Set.of(peer1, peer2)

peer1.on('signal', (signal) => peer2.signal(signal))
Expand Down
6 changes: 3 additions & 3 deletions discojs/discojs-core/src/client/decentralized/peer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { NodeID } from '../types'

import { List, Map, Range, Seq } from 'immutable'
import wrtc from 'isomorphic-wrtc'
import SimplePeer, { SignalData } from 'simple-peer'

type MessageID = number
Expand Down Expand Up @@ -48,9 +49,9 @@ export class Peer {
chunks: Map<ChunkID, Buffer>
}>()

constructor (id: NodeID, opts?: SimplePeer.Options) {
constructor (id: NodeID, initiator: boolean = false) {
this.id = id
this.peer = new SimplePeer(opts)
this.peer = new SimplePeer({ wrtc, initiator })
}

send (msg: Buffer): void {
Expand Down Expand Up @@ -236,7 +237,6 @@ export class Peer {
readyMessages
.forEach((message) => {
// TODO debug
// @ts-expect-error
listener(message)
})
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@ describe.skip('peer pool', function () {
beforeEach(async () => {
const count = 3

pools = Map(await Promise.all(
Range(1, count + 1).map(String).map(async (id) =>
[id, await PeerPool.init(id)] as [NodeID, PeerPool]
).toArray()
pools = Map(Range(1, count + 1).map(String).map((id) =>
[id, new PeerPool(id)]
))
})

Expand Down
27 changes: 3 additions & 24 deletions discojs/discojs-core/src/client/decentralized/peer_pool.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Map, Set } from 'immutable'
import { SignalData } from 'simple-peer'
import * as WRTC from '@roamhq/wrtc'

import { Peer } from './peer'
import { NodeID } from '../types'
Expand All @@ -11,25 +10,10 @@ import { PeerConnection, EventConnection } from '../event_connection'
export class PeerPool {
private peers = Map<NodeID, PeerConnection>()

private constructor (
private readonly id: NodeID,
private readonly wrtc?: typeof WRTC
constructor (
private readonly id: NodeID
) {}

static async init (id: NodeID): Promise<PeerPool> {
// needed on node
let wrtc: typeof WRTC | undefined
try {
// resolve relatively to where it is run, not from discojs dir
const path = require.resolve('@roamhq/wrtc', { paths: ['.'] })
wrtc = await import(path)
} catch (e) {
// expected
}

return new PeerPool(id, wrtc)
}

shutdown (): void {
console.info(`[${this.id}] shutdown their peers`)

Expand Down Expand Up @@ -62,12 +46,7 @@ export class PeerPool {

const newPeers = Map(peersToConnect
.filter((id) => !this.peers.has(id))
.map((id) => [id, id < this.id] as [NodeID, boolean])
.map(([id, initiator]) => {
const p = new Peer(id, { initiator, wrtc: this.wrtc })
// onNewPeer(id, p)
return [id, p]
}))
.map((id) => [id, new Peer(id, id < this.id)]))

console.info(`[${this.id}] asked to connect new peers:`, newPeers.keySeq().toJS())
const newPeersConnections = newPeers.map((peer, id) => new PeerConnection(this.id, peer, signallingServer))
Expand Down
1 change: 1 addition & 0 deletions discojs/discojs-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
"homepage": "https://github.com/epfml/disco#readme",
"dependencies": {
"@epfml/discojs-core": "*",
"@roamhq/wrtc": "^0.7.1",
"@tensorflow/tfjs-node": "4"
},
"devDependencies": {
Expand Down
24 changes: 24 additions & 0 deletions isomorphic-wrtc/browser.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
"use strict";

// TODO taken from "get-browser-rtc"
function getBrowserRTC() {
if (typeof globalThis === "undefined") return null;
var wrtc = {
RTCPeerConnection:
globalThis.RTCPeerConnection ||
globalThis.mozRTCPeerConnection ||
globalThis.webkitRTCPeerConnection,
RTCSessionDescription:
globalThis.RTCSessionDescription ||
globalThis.mozRTCSessionDescription ||
globalThis.webkitRTCSessionDescription,
RTCIceCandidate:
globalThis.RTCIceCandidate ||
globalThis.mozRTCIceCandidate ||
globalThis.webkitRTCIceCandidate,
};
if (!wrtc.RTCPeerConnection) return null;
return wrtc;
}

module.exports = getBrowserRTC();
9 changes: 9 additions & 0 deletions isomorphic-wrtc/index.d.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
declare module 'isomorphic-wrtc' {
const wrtc: {
RTCPeerConnection: typeof RTCPeerConnection
RTCSessionDescription: typeof RTCSessionDescription
RTCIceCandidate: typeof RTCIceCandidate
}
export type WRTC = typeof wrtc
export default wrtc
}
3 changes: 3 additions & 0 deletions isomorphic-wrtc/node.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
"use strict";

module.exports = require("@roamhq/wrtc");
18 changes: 18 additions & 0 deletions isomorphic-wrtc/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"name": "isomorphic-wrtc",
"version": "1.0.0",
"description": "load WebRTC implementation available on platform",
"main": "node.js",
"browser": "browser.js",
"types": "index.d.ts",
"license": "ISC",
"scripts": {
"build": ": nothing",
"lint": ": nothing",
"test": ": nothing"
},
"peerDependencies": {
"@roamhq/wrtc": "^0.7.1",
"get-browser-rtc": "*"
}
}
Loading

0 comments on commit 993f8de

Please sign in to comment.