Skip to content

Commit

Permalink
update arg format
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 29, 2024
1 parent 42a95f4 commit aaf89ca
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 25 deletions.
40 changes: 37 additions & 3 deletions packages/transport-http/src/subscribe/handlers/blocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,22 @@ type BlockDataModel = {
}
}

type BlockArgsModel =
| {
block_status?: number
start_block_id?: string
}
| {
block_status?: number
start_block_height?: number
}

export const blocksHandler = createSubscriptionHandler<{
Topic: SdkTransport.SubscriptionTopic.BLOCKS
Args: SdkTransport.SubscriptionArguments<SdkTransport.SubscriptionTopic.BLOCKS>
Data: SdkTransport.SubscriptionData<SdkTransport.SubscriptionTopic.BLOCKS>
RawData: BlockDataModel
ArgsModel: BlockArgsModel
DataModel: BlockDataModel
}>({
topic: SdkTransport.SubscriptionTopic.BLOCKS,
createSubscriber: (initialArgs, onData, onError) => {
Expand Down Expand Up @@ -56,15 +67,38 @@ export const blocksHandler = createSubscriptionHandler<{

// Update the resume args
resumeArgs = {
block_status: resumeArgs.block_status,
start_block_id: data.block.id,
blockStatus: resumeArgs.blockStatus,
startBlockHeight: data.block.height + 1,
}

onData(parsedData)
},
sendError(error: Error) {
onError(error)
},
encodeArgs(
args: SdkTransport.SubscriptionArguments<SdkTransport.SubscriptionTopic.BLOCKS>
) {
let encodedArgs: BlockArgsModel = {
block_status: args.blockStatus,
}

if ("startBlockHeight" in args) {
return {
...encodedArgs,
start_block_height: args.startBlockHeight,
}
}

if ("startBlockId" in args) {
return {
...encodedArgs,
start_block_id: args.startBlockId,
}
}

return encodedArgs
},
get connectionArgs() {
return resumeArgs
},
Expand Down
17 changes: 12 additions & 5 deletions packages/transport-http/src/subscribe/handlers/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,28 +8,34 @@ export interface SubscriptionHandler<
Topic: string
Args: any
Data: any
RawData: any
DataModel: any
ArgsModel: any
},
> {
readonly topic: T["Topic"]
createSubscriber(
initialArgs: T["Args"],
onData: (data: T["Data"]) => void,
onError: (error: Error) => void
): DataSubscriber<T["Args"], T["RawData"]>
): DataSubscriber<T["Args"], T["ArgsModel"], T["DataModel"]>
}

export interface DataSubscriber<Args, RawData> {
export interface DataSubscriber<Args, ArgsModel, Data> {
/**
* The callback to call when a data is received
*/
sendData(data: RawData): void
sendData(data: Data): void

/**
* The callback to call when an error is received
*/
sendError(error: Error): void

/**
* The arguments to connect or reconnect to the subscription
*/
encodeArgs(args: Args): ArgsModel

/**
* Get the arguments to connect or reconnect to the subscription
*/
Expand All @@ -41,7 +47,8 @@ export function createSubscriptionHandler<
Topic: string
Args: any
Data: any
RawData: any
DataModel: any
ArgsModel: any
},
>(handler: SubscriptionHandler<T>): SubscriptionHandler<T> {
return handler
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jest.mock("./websocket", () => ({

describe("WebSocket Manager", () => {
let mockWs: WS
let mockSubscriber: jest.Mocked<DataSubscriber<any, any>>
let mockSubscriber: jest.Mocked<DataSubscriber<any, any, any>>
let mockHandler: jest.Mocked<SubscriptionHandler<any>>
const mockConnectionArgs = {mock: "connection args"}

Expand All @@ -31,6 +31,7 @@ describe("WebSocket Manager", () => {
mockSubscriber = {
sendData: jest.fn(),
sendError: jest.fn(),
encodeArgs: jest.fn().mockReturnValue(mockConnectionArgs),
get connectionArgs() {
return mockConnectionArgs
},
Expand Down
14 changes: 8 additions & 6 deletions packages/transport-http/src/subscribe/subscription-manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,15 @@ type DeepRequired<T> = Required<{

type InferHandler<T> = T extends SubscriptionHandler<infer H> ? H : never

interface SubscriptionInfo<T extends DataSubscriber<any, any>> {
interface SubscriptionInfo<T extends DataSubscriber<any, any, any>> {
// Internal ID for the subscription
id: number
// Remote ID assigned by the server used for message routing and unsubscribing
remoteId?: string
// The topic of the subscription
topic: string
// Data provider for the subscription
subscriber: DataSubscriber<any, any>
subscriber: DataSubscriber<any, any, any>
}

export interface SubscriptionManagerConfig {
Expand Down Expand Up @@ -65,7 +65,7 @@ export class SubscriptionManager<
> {
private counter = 0
private socket: WebSocket | null = null
private subscriptions: SubscriptionInfo<DataSubscriber<any, any>>[] = []
private subscriptions: SubscriptionInfo<DataSubscriber<any, any, any>>[] = []
private config: DeepRequired<SubscriptionManagerConfig>
private reconnectAttempts = 0
private handlers: Record<string, SubscriptionHandler<any>>
Expand Down Expand Up @@ -205,7 +205,7 @@ export class SubscriptionManager<
)

// Track the subscription locally
const sub: SubscriptionInfo<DataSubscriber<any, any>> = {
const sub: SubscriptionInfo<DataSubscriber<any, any, any>> = {
id: this.counter++,
topic: opts.topic,
subscriber: subscriber,
Expand Down Expand Up @@ -250,7 +250,9 @@ export class SubscriptionManager<
}
}

private async sendSubscribe(sub: SubscriptionInfo<DataSubscriber<any, any>>) {
private async sendSubscribe(
sub: SubscriptionInfo<DataSubscriber<any, any, any>>
) {
// Send the subscription message
const request: SubscribeMessageRequest = {
action: Action.SUBSCRIBE,
Expand All @@ -271,7 +273,7 @@ export class SubscriptionManager<
}

private async sendUnsubscribe(
sub: SubscriptionInfo<DataSubscriber<any, any>>
sub: SubscriptionInfo<DataSubscriber<any, any, any>>
) {
// Send the unsubscribe message if the subscription has a remote id
const {remoteId} = sub
Expand Down
18 changes: 8 additions & 10 deletions packages/typedefs/src/sdk-transport/subscriptions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,14 @@ export enum SubscriptionTopic {

export type SubscriptionSchema = {
[SubscriptionTopic.BLOCKS]: SchemaItem<
{
block_status?: number
} & (
| {
start_block_id?: string
}
| {
start_block_height?: number
}
),
| {
blockStatus?: number
startBlockId?: string
}
| {
blockStatus?: number
startBlockHeight?: number
},
{
block: Block
}
Expand Down

0 comments on commit aaf89ca

Please sign in to comment.