Skip to content

Commit

Permalink
types changes
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 24, 2024
1 parent 426bf40 commit e76e304
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 74 deletions.
1 change: 1 addition & 0 deletions packages/transport-http/src/sdk-send-http.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ export {sendGetNetworkParameters} from "./send-get-network-parameters"
export {sendGetNodeVersionInfo} from "./send-get-node-version-info"
export {connectSubscribeEvents} from "./connect-subscribe-events"
export {send} from "./send-http"
export {WsSubscriptionTransport as SubscriptionTransport} from "./subscriptions/ws-transport"
export {WebsocketError} from "./connect-ws"
export {HTTPRequestError} from "./http-request.js"
45 changes: 0 additions & 45 deletions packages/transport-http/src/subscriptions/types.ts

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@ import {
SubscriptionDataMessage,
UnsubscribeMessageRequest,
} from "./models"
import {StreamController} from "./stream-controller"
import {SubscriptionTopic} from "./types"
import {WsSubscriptionTransport} from "./ws-transport"
import {Subscription} from "@onflow/typedefs"

jest.mock("../websocket", () => ({
WebSocket: mockSocket,
}))

describe("StreamController", () => {
describe("WsSubscriptionTransport", () => {
let mockWs: WS
beforeEach(() => {
mockWs = new WS("wss://localhost:8080")
Expand All @@ -29,7 +29,7 @@ describe("StreamController", () => {
reconnectInterval: 1000,
reconnectAttempts: 10,
}
new StreamController(config)
new WsSubscriptionTransport(config)

await new Promise(resolve => setTimeout(resolve, 0))
expect(mockWs.server.clients).toHaveLength(0)
Expand All @@ -41,8 +41,8 @@ describe("StreamController", () => {
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
const topic = "topic" as SubscriptionTopic
const streamController = new WsSubscriptionTransport(config)
const topic = "topic" as Subscription.Topic
const args = {key: "value"} as any
const onData = jest.fn()
const onError = jest.fn()
Expand Down Expand Up @@ -93,8 +93,8 @@ describe("StreamController", () => {
reconnectInterval: 1000,
reconnectAttempts: 10,
}
const streamController = new StreamController(config)
const topic = "topic" as SubscriptionTopic
const streamController = new WsSubscriptionTransport(config)
const topic = "topic" as Subscription.Topic
const args = {key: "value"} as any
const onData = jest.fn()
const onError = jest.fn()
Expand Down Expand Up @@ -165,8 +165,8 @@ describe("StreamController", () => {
reconnectInterval: 1000,
reconnectAttempts: 1,
}
const streamController = new StreamController(config)
const topic = "topic" as SubscriptionTopic
const streamController = new WsSubscriptionTransport(config)
const topic = "topic" as Subscription.Topic
const args = {key: "value"} as any
const onData = jest.fn()
const onError = jest.fn()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,44 +10,40 @@ import {
} from "./models"
import {
Subscription,
SubscriptionArguments,
SubscriptionResponse,
SubscriptionTopic,
SubscriptionTransport,
} from "./types"
} from "@onflow/typedefs"
import {WebSocket} from "../websocket"

const WS_OPEN = 1

interface SubscriptionInfo<T extends SubscriptionTopic> {
interface SubscriptionInfo<T extends Subscription.Topic> {
// Internal ID for the subscription
id: number
// Remote ID assigned by the server used for message routing and unsubscribing
remoteId?: string
// The topic of the subscription
topic: T
// The checkpoint to resume the subscription from
checkpoint: SubscriptionArguments<T>
checkpoint: Subscription.Arguments<T>
// The callback to call when a data is received
onData: (data: any) => void
// The callback to call when an error occurs
onError: (error: Error) => void
}

interface StreamControllerConfig {
interface SubscriptionTransportConfig {
hostname: string
reconnectInterval: number
reconnectAttempts: number
}

export class StreamController implements SubscriptionTransport {
export class WsSubscriptionTransport implements Subscription.Transport {
private counter = 0
private subscriptions: SubscriptionInfo<SubscriptionTopic>[] = []
private subscriptions: SubscriptionInfo<Subscription.Topic>[] = []
private socket: WebSocket | null = null
private config: StreamControllerConfig
private config: SubscriptionTransportConfig
private reconnectAttempts = 0

constructor(config: StreamControllerConfig) {
constructor(config: SubscriptionTransportConfig) {
this.config = {
hostname: config.hostname,
reconnectInterval: config.reconnectInterval || 2000,
Expand Down Expand Up @@ -143,12 +139,12 @@ export class StreamController implements SubscriptionTransport {
this.reconnectAttempts = 0
}

async subscribe<T extends SubscriptionTopic>(opts: {
async subscribe<T extends Subscription.Topic>(opts: {
topic: T
args: SubscriptionArguments<T>
onData: (data: SubscriptionResponse<T>) => void
args: Subscription.Arguments<T>
onData: (data: Subscription.Response<T>) => void
onError: (error: Error) => void
}): Promise<Subscription> {
}): Promise<Subscription.Instance> {
// Connect the socket if it's not already open
await this.connect()

Expand Down Expand Up @@ -200,7 +196,7 @@ export class StreamController implements SubscriptionTransport {
}
}

private async sendSubscribe(sub: SubscriptionInfo<SubscriptionTopic>) {
private async sendSubscribe(sub: SubscriptionInfo<Subscription.Topic>) {
// Send the subscription message
const request: SubscribeMessageRequest = {
action: "subscribe",
Expand All @@ -220,7 +216,7 @@ export class StreamController implements SubscriptionTransport {
return response
}

private async sendUnsubscribe(sub: SubscriptionInfo<SubscriptionTopic>) {
private async sendUnsubscribe(sub: SubscriptionInfo<Subscription.Topic>) {
// Send the unsubscribe message if the subscription has a remote id
const {remoteId} = sub
if (remoteId) {
Expand Down Expand Up @@ -255,7 +251,7 @@ export class StreamController implements SubscriptionTransport {
// Update the subscription checkpoint when a message is received
// These checkpoints are used to resume subscriptions after disconnects
private updateSubscriptionCheckpoint<
T extends SubscriptionTopic = SubscriptionTopic,
T extends Subscription.Topic = Subscription.Topic,
>(sub: SubscriptionInfo<T>, message: SubscriptionDataMessage) {
// TODO: Will be implemented with each subscription topic
}
Expand Down
1 change: 1 addition & 0 deletions packages/typedefs/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -459,3 +459,4 @@ export type EventStream = StreamConnection<{

export * from "./interaction"
export * from "./fvm-errors"
export * as Subscription from "./subscriptions"
50 changes: 50 additions & 0 deletions packages/typedefs/src/subscriptions.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
export enum Topic {
EVENTS = "events",
BLOCKS = "blocks",
}

export type Schema = {
[Topic.EVENTS]: SchemaItem<
{
startBlock: number
endBlock: number
},
{
type: string
data: any
}
>
[Topic.BLOCKS]: SchemaItem<
{
startBlock: number
endBlock: number
},
{
type: string
data: any
}
>
}

export type SchemaItem<TArgs, TResponse> = {
args: TArgs
response: TResponse
}

export type Arguments<T extends Topic> =
Schema[T]["args"]
export type Response<T extends Topic> =
Schema[T]["response"]

export type Instance = {
unsubscribe: () => void
}

export type Transport = {
subscribe: <T extends Topic>(opts: {
topic: T
args: Arguments<T>
onData: (data: Response<T>) => void
onError: (error: Error) => void
}) => Promise<Instance>
}

0 comments on commit e76e304

Please sign in to comment.