Skip to content

Commit

Permalink
wip: janus POC
Browse files Browse the repository at this point in the history
  • Loading branch information
farhat-ha committed Feb 29, 2024
1 parent 2aa0db9 commit 62ab0ed
Show file tree
Hide file tree
Showing 30 changed files with 5,023 additions and 178 deletions.
1 change: 1 addition & 0 deletions packages/js/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"bugs:": "https://github.com/team-telnyx/webrtc/issues",
"license": "MIT",
"dependencies": {
"eventemitter3": "^5.0.1",
"loglevel": "^1.6.8",
"uuid": "^7.0.3"
},
Expand Down
69 changes: 69 additions & 0 deletions packages/js/src/Modules/Janus/Connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Establishes a connection to the Janus server
* through a WebSocket connection.
*/
import EventEmitter from 'eventemitter3';
import { DEV_HOST } from './util/constants';

export default class JanusConnection extends EventEmitter {
public static PROTOCOL = 'janus-protocol';
private _socket: WebSocket | null = null;

public connect = () => {
// TODO - use PROD_HOST in production.
this._socket = new WebSocket(DEV_HOST, JanusConnection.PROTOCOL);
this._socket.addEventListener('open', this._onStateChange);
this._socket.addEventListener('close', this._onStateChange);
this._socket.addEventListener('message', this._onMessage);
this._socket.addEventListener('error', this._onError);
};

private _onError = (error: Event) => {
this.emit('error', error);
};
private _onMessage = (ev: MessageEvent) => {
this.emit('message', ev.data);
};

private _onStateChange = () => {
this.emit('stateChange', this._socket.readyState);
};

get connected(): boolean {
return this._socket && this._socket.readyState === WebSocket.OPEN;
}

get connecting(): boolean {
return this._socket && this._socket.readyState === WebSocket.CONNECTING;
}

get closing(): boolean {
return this._socket && this._socket.readyState === WebSocket.CLOSING;
}

get closed(): boolean {
return this._socket && this._socket.readyState === WebSocket.CLOSED;
}

get isAlive(): boolean {
return this.connecting || this.connected;
}

get isDead(): boolean {
return this.closing || this.closed;
}

public sendRaw(data: string | ArrayBuffer | Blob | ArrayBufferView): void {
{
this._socket.send(data);
}
}

public close() {
this._socket.close();
this._socket.removeEventListener('open', this._onStateChange);
this._socket.removeEventListener('message', this._onMessage);
this._socket.removeEventListener('close', this._onStateChange);
this._socket.removeEventListener('error', this._onError);
}
}
141 changes: 141 additions & 0 deletions packages/js/src/Modules/Janus/Handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
export const isFunction = (variable: any): boolean =>
variable instanceof Function || typeof variable === 'function';

type QueueMap = { [key: string]: Function[] };

const GLOBAL = 'GLOBAL';
const queue: QueueMap = {};
const _buildEventName = (event: string, uniqueId: string) =>
`${event}|${uniqueId}`;

const isQueued = (event: string, uniqueId: string = GLOBAL) => {
const eventName = _buildEventName(event, uniqueId);
return eventName in queue;
};

const queueLength = (event: string, uniqueId: string = GLOBAL): number => {
const eventName = _buildEventName(event, uniqueId);
return eventName in queue ? queue[eventName].length : 0;
};

/**
* Subscribes the callback to the passed event. Use uniqueId to render unique the event.
*/
const register = (
event: string,
callback: Function,
uniqueId: string = GLOBAL
) => {
const eventName = _buildEventName(event, uniqueId);
if (!(eventName in queue)) {
queue[eventName] = [];
}
queue[eventName].push(callback);
};

/**
* Subscribes the callback to the passed event only once. Use uniqueId to render unique the event.
*/
const registerOnce = (
event: string,
callback: Function,
uniqueId: string = GLOBAL
) => {
/* tslint:disable-next-line */
const cb = function (data) {
deRegister(event, cb, uniqueId);
callback(data);
};
cb.prototype.targetRef = callback;
return register(event, cb, uniqueId);
};

/**
* Remove subscription by callback. If not callback is passed in, all subscription will be removed.
*/
const deRegister = (
event: string,
callback?: Function | null,
uniqueId: string = GLOBAL
) => {
if (!isQueued(event, uniqueId)) {
return false;
}
const eventName = _buildEventName(event, uniqueId);
if (isFunction(callback)) {
const len = queue[eventName].length;
for (let i = len - 1; i >= 0; i--) {
const fn = queue[eventName][i];
if (
callback === fn ||
(fn.prototype && callback === fn.prototype.targetRef)
) {
queue[eventName].splice(i, 1);
}
}
} else {
queue[eventName] = [];
}
if (queue[eventName].length === 0) {
// Cleanup
delete queue[eventName];
}
return true;
};

/**
* Trigger the event, passing the data to it's subscribers. Use uniqueId to identify unique events.
*/
const trigger = (
event: string,
data: any,
uniqueId: string = GLOBAL,
globalPropagation: boolean = true
): boolean => {
const _propagate: boolean = globalPropagation && uniqueId !== GLOBAL;
if (!isQueued(event, uniqueId)) {
if (_propagate) {
trigger(event, data);
}
return false;
}
const eventName = _buildEventName(event, uniqueId);
const len = queue[eventName].length;
if (!len) {
if (_propagate) {
trigger(event, data);
}
return false;
}
for (let i = len - 1; i >= 0; i--) {
queue[eventName][i](data);
}
if (_propagate) {
trigger(event, data);
}
return true;
};

/**
* Remove all subscriptions
*/
const deRegisterAll = (event: string) => {
const eventName = _buildEventName(event, '');
Object.keys(queue)
.filter((name) => name.indexOf(eventName) === 0)
.forEach((event) => delete queue[event]);
};

const clearQueue = () =>
Object.keys(queue).forEach((event) => delete queue[event]);

export {
trigger,
register,
registerOnce,
deRegister,
deRegisterAll,
isQueued,
queueLength,
clearQueue,
};
6 changes: 6 additions & 0 deletions packages/js/src/Modules/Janus/KeepAliveAgent.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import JanusConnection from "./Connection";

export class KeepAliveAgent {
private connection: JanusConnection
// TODO - Implement Keep Alive Agent
}
55 changes: 55 additions & 0 deletions packages/js/src/Modules/Janus/Request.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
export enum Janus {
create = 'create',
success = 'success',
error = 'error',
attach = 'attach',
message = 'message',
event = 'event',
ack = 'ack'
}

type JanusCreateSessionRequest = {
janus: Janus.create;
transaction?: string;
};

type JanusAttachPluginRequest = {
janus: Janus.attach;
plugin: string;
session_id: number;
transaction?: string;
};

type JanusSIPRegisterRequest = {
janus: Janus.message;
body: {
request: 'register';
type?: string; //"<if guest or helper, no SIP REGISTER is actually sent; optional>",
send_register?: boolean; // <true|false; if false, no SIP REGISTER is actually sent; optional>,
force_udp?: boolean; // <true|false; if true, forces UDP for the SIP messaging; optional>,
force_tcp?: boolean; // <true|false; if true, forces TCP for the SIP messaging; optional>,
sips?: boolean; // <true|false; if true, configures a SIPS URI too when registering; optional>,
rfc2543_cancel?: boolean; // <true|false; if true, configures sip client to CANCEL pending INVITEs without having received a provisional response first; optional>,
username: string; // "<SIP URI to register; mandatory>",
secret?: string; // "<password to use to register; optional>",
ha1_secret?: string; // "<prehashed password to use to register; optional>",
authuser?: string; // "<username to use to authenticate (overrides the one in the SIP URI); optional>",
display_name?: string; // "<display name to use when sending SIP REGISTER; optional>",
user_agent?: string; //"<user agent to use when sending SIP REGISTER; optional>",
proxy?: string; //"<server to register at; optional, as won't be needed in case the REGISTER is not goint to be sent (e.g., guests)>";
outbound_proxy?: string; // '<outbound proxy to use, if any; optional>';
headers?: Record<string, string>; // '<object with key/value mappings (header name/value), to specify custom headers to add to the SIP REGISTER; optional>';
contact_params?: Record<string, string>[]; // "<array of key/value objects, to specify custom Contact URI params to add to the SIP REGISTER; optional>",
incoming_header_prefixes?: string[]; // "<array of strings, to specify custom (non-standard) headers to read on incoming SIP events; optional>",
refresh?: boolean; // "<true|false; if true, only uses the SIP REGISTER as an update and not a new registration; optional>",
master_id?: number; //"<ID of an already registered account, if this is an helper for multiple calls (more on that later); optional>",
register_ttl?: number; //: "<integer; number of seconds after which the registration should expire; optional>"
};
transaction?: string;
handle_id: number;
session_id: number;
};
export type JanusRequest =
| JanusCreateSessionRequest
| JanusAttachPluginRequest
| JanusSIPRegisterRequest;
Loading

0 comments on commit 62ab0ed

Please sign in to comment.