Skip to content

Commit

Permalink
setting up tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 22, 2024
1 parent e74196e commit 2f3dcea
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 25 deletions.
47 changes: 47 additions & 0 deletions packages/transport-http/src/subscriptions/mocks/websocket.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import EventEmitter from "events"

export function createMockWebSocket() {
const eventEmitter = new EventEmitter()
eventEmitter.on("message", (e) => websocket.onmessage(e))
eventEmitter.on("error", (e) => websocket.onerror(e))
eventEmitter.on("close", (e) => websocket.onclose(e))
eventEmitter.on("open", (e) => websocket.onopen(e))

const websocket = {
onmessage: jest.fn(),
onclose: jest.fn(),
onopen: jest.fn(),
onerror: jest.fn(),
send: (data: any) => {
connection.onmessage(new MessageEvent("message", { data }))
},
readyState: 1,
}

const connection = {
send: (data: any) => {
eventEmitter.emit("message", new MessageEvent("message", { data: JSON.stringify(data) }))
},
sendError: (error: Error) => {
eventEmitter.emit("error", new ErrorEvent("error", { error }))
},
close: () => {
websocket.readyState = 3
eventEmitter.emit("close", new CloseEvent("close"))
},
onmessage: jest.fn(),
onclose: jest.fn(),
}

return {
WebSocket: jest.fn(() => {
console.log("NEW WEBSOCKET")
setTimeout(() => {
console.log("OPENING")
eventEmitter.emit("open", new Event("open"))
}, 1000)
return websocket
}),
mockConnection: connection
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import { createMockWebSocket } from "./mocks/websocket"
import { SubscribeMessageResponse } from "./models"
import { StreamController } from "./stream-controller"
import { SubscriptionResponse, SubscriptionTopic } from "./types"

let mockWs: ReturnType<typeof createMockWebSocket>

jest.mock("../websocket", () => ({
WebSocket: jest.fn().mockImplementation(() => mockWs.WebSocket),
}))

describe("StreamController", () => {
beforeEach(() => {
mockWs = createMockWebSocket()
})

test("constructor", () => {
const config = {
hostname: "hostname",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
expect(streamController).toBeDefined()
})

test("subscribe sends subscribe message", async () => {
const config = {
hostname: "hostname",
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
const topic = "topic" as SubscriptionTopic
const args = { key: "value" } as any
const onData = jest.fn()
const onError = jest.fn()

console.log("YO")

mockWs.mockConnection.onmessage.mockImplementation((event) => {
console.log(event)
const data = JSON.parse(event.data)
expect(data).toEqual({
topic,
arguments: args,
})

const response: SubscribeMessageResponse = {
id: "id",
action: "subscribe",
success: true,
topic,
}
mockWs.mockConnection.send(response)
})

const subscription = await streamController.subscribe({
topic,
args,
onData,
onError,
})

expect(mockWs.mockConnection.onmessage).toHaveBeenCalledTimes(1)


})
})
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {MessageResponse, SubscriptionDataMessage} from "./models"
import {MessageResponse, SubscriptionDataMessage, UnsubscribeMessageResponse} from "./models"
import {
SubscribeMessageRequest,
SubscribeMessageResponse,
Expand All @@ -13,6 +13,8 @@ import {
} from "./types"
import {WebSocket} from "../websocket"

const WS_OPEN = 1

interface SubscriptionInfo<T extends SubscriptionTopic> {
// Internal ID for the subscription
id: number
Expand All @@ -28,24 +30,20 @@ interface SubscriptionInfo<T extends SubscriptionTopic> {
onError: (error: Error) => void
}

interface WebsocketTransportConfig {
interface StreamControllerConfig {
hostname: string
reconnectInterval: number
reconnectAttempts: number
}

export class WebsocketTransport implements SubscriptionTransport {
export class StreamController implements SubscriptionTransport {
private counter = 0
private subscriptions: SubscriptionInfo<SubscriptionTopic>[] = []

// WebSocket is only opened when there are active subscriptions
private socket: WebSocket | null = null

private config: WebsocketTransportConfig

private config: StreamControllerConfig
private reconnectAttempts = 0

constructor(config: WebsocketTransportConfig) {
constructor(config: StreamControllerConfig) {
this.config = {
hostname: config.hostname,
reconnectInterval: config.reconnectInterval || 1000,
Expand All @@ -54,19 +52,16 @@ export class WebsocketTransport implements SubscriptionTransport {
}

// Lazy connect to the socket when the first subscription is made
connect() {
private async connect() {
return new Promise<void>((resolve) => {
// If the socket is already open, do nothing
if (this.socket?.readyState === WebSocket.OPEN) {
if (this.socket?.readyState === WS_OPEN) {
return
}

// Restore subscriptions if the socket was closed
if (this.socket?.readyState === WebSocket.CLOSED) {
this.subscriptions.forEach(sub => {
this.socket?.send(JSON.stringify(sub))
})
}
console.log("CONNECTING")

console.log("WS", WebSocket)

this.socket = new WebSocket(this.config.hostname)
this.socket.onmessage = event => {
Expand Down Expand Up @@ -96,6 +91,7 @@ export class WebsocketTransport implements SubscriptionTransport {
}

this.socket.onopen = () => {
console.log("WebSocket connection established")
// Restore subscriptions
Promise.all(this.subscriptions.map(async sub => {
const response = await this.sendSubscribe(sub)
Expand Down Expand Up @@ -148,8 +144,11 @@ export class WebsocketTransport implements SubscriptionTransport {
onData: (data: SubscriptionResponse<T>) => void
onError: (error: Error) => void
}): Promise<Subscription> {
console.log("TRY CONN")
// Connect the socket if it's not already open
this.connect()
await this.connect()

console.log("CONN?")

// Track the subscription locally
const sub: SubscriptionInfo<T> = {
Expand Down Expand Up @@ -208,7 +207,7 @@ export class WebsocketTransport implements SubscriptionTransport {
}
this.socket?.send(JSON.stringify(request))

const response = await this.waitForResponse()
const response: SubscribeMessageResponse = await this.waitForResponse()

if (!response.success) {
throw new Error(
Expand All @@ -229,7 +228,7 @@ export class WebsocketTransport implements SubscriptionTransport {
}
this.socket?.send(JSON.stringify(request))

const response = await this.waitForResponse()
const response: UnsubscribeMessageResponse = await this.waitForResponse()

if (!response.success) {
throw new Error(
Expand All @@ -239,9 +238,15 @@ export class WebsocketTransport implements SubscriptionTransport {
}
}

private async waitForResponse() {
return new Promise<SubscribeMessageResponse>(resolve => {
// TODO: NOOP, waiting for AN team to decide what to do here
private async waitForResponse<T extends MessageResponse>(): Promise<T> {
// TODO: NOOP, waiting for AN team to decide what to do here, this is a placeholder
return new Promise(resolve => {
this.socket?.addEventListener("message", event => {
const data = JSON.parse(event.data) as T
if (data.action) {
resolve(data)
}
})
})
}

Expand All @@ -251,6 +256,6 @@ export class WebsocketTransport implements SubscriptionTransport {
sub: SubscriptionInfo<T>,
message: SubscriptionDataMessage
) {
// TODO: Will be implemented in the future
// TODO: Will be implemented with each subscription topic
}
}

This file was deleted.

0 comments on commit 2f3dcea

Please sign in to comment.