Skip to content

Commit

Permalink
[WIP] Event streaming
Browse files Browse the repository at this point in the history
  • Loading branch information
jribbink committed Nov 9, 2023
1 parent 2950e79 commit f8c4a60
Show file tree
Hide file tree
Showing 14 changed files with 450 additions and 166 deletions.
188 changes: 79 additions & 109 deletions package-lock.json

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions packages/sdk/src/build/build-subscribe-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import {pipe, Ok, makeSubscribeEvents} from "../interaction/interaction.js"

export function subscribeEvents({
startBlockId,
startHeight,
eventTypes,
addresses,
contracts,
heartbeatInterval,
}) {
return pipe([
makeSubscribeEvents,
ix => {
ix.subscribeEvents.startBlockId = startBlockId
ix.subscribeEvents.startHeight = startHeight
ix.subscribeEvents.eventTypes = eventTypes
ix.subscribeEvents.addresses = addresses
ix.subscribeEvents.contracts = contracts
ix.subscribeEvents.heartbeatInterval = heartbeatInterval
return Ok(ix)
},
])
}
26 changes: 26 additions & 0 deletions packages/sdk/src/build/build-subscribe-events.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import {interaction} from "../interaction/interaction.js"
import {subscribeEvents} from "./build-subscribe-events.js"

describe("Subscribe Events", () => {
test("Subscribe Events", async () => {
let ix = await subscribeEvents({
startBlockId: "abc123",
startHeight: 1,
eventTypes: ["A.7e60df042a9c0868.FlowToken.TokensWithdrawn"],
addresses: ["0x1", "0x2"],
contracts: ["A.7e60df042a9c0868.FlowToken"],
heartbeatInterval: 1000,
})(interaction())

expect(ix.subscribeEvents.startBlockId).toBe("abc123")
expect(ix.subscribeEvents.startHeight).toBe(1)
expect(ix.subscribeEvents.eventTypes).toEqual([
"A.7e60df042a9c0868.FlowToken.TokensWithdrawn",
])
expect(ix.subscribeEvents.addresses).toEqual(["0x1", "0x2"])
expect(ix.subscribeEvents.contracts).toEqual([
"A.7e60df042a9c0868.FlowToken",
])
expect(ix.subscribeEvents.heartbeatInterval).toBe(1000)
})
})
8 changes: 8 additions & 0 deletions packages/sdk/src/build/build-subscription.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
export function subscription({onData, onError, onComplete}) {
return ix => {
ix.subscription.onData = onData
ix.subscription.onError = onError
ix.subscription.onComplete = onComplete
return ix
}
}
20 changes: 20 additions & 0 deletions packages/sdk/src/build/build-subscription.test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import {interaction} from "../interaction/interaction.js"
import {subscription} from "./build-subscription.js"

describe("Build Subscription", () => {
test("build subscription", async () => {
const onData = () => {}
const onError = () => {}
const onComplete = () => {}

let ix = await subscription({
onData,
onError,
onComplete,
})(interaction())

expect(ix.subscription.onData).toBe(onData)
expect(ix.subscription.onError).toBe(onError)
expect(ix.subscription.onComplete).toBe(onComplete)
})
})
2 changes: 2 additions & 0 deletions packages/sdk/src/decode/decode.js
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ export const decodeResponse = async (response, customDecoders = {}) => {
return {
chainId: chainIdMap[response.networkParameters.chainId],
}
} else if (response.unsubscribeCallback) {
return response.unsubscribeCallback
}

return null
Expand Down
116 changes: 68 additions & 48 deletions packages/sdk/src/interaction/interaction.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export const TRANSACTION /* */ = "TRANSACTION"
export const GET_TRANSACTION_STATUS /* */ = "GET_TRANSACTION_STATUS"
export const GET_ACCOUNT /* */ = "GET_ACCOUNT"
export const GET_EVENTS /* */ = "GET_EVENTS"
export const SUBSCRIBE_EVENTS /* */ = "SUBSCRIBE_EVENTS"
export const PING /* */ = "PING"
export const GET_TRANSACTION /* */ = "GET_TRANSACTION"
export const GET_BLOCK /* */ = "GET_BLOCK"
Expand Down Expand Up @@ -89,6 +90,14 @@ const IX = `{
"end":null,
"blockIds":[]
},
"subscribeEvents": {
"startBlockId":null,
"startHeight":null,
"eventTypes":null,
"addresses":null,
"contracts":null,
"heartbeatInterval":null
},
"transaction": {
"id":null
},
Expand All @@ -102,6 +111,11 @@ const IX = `{
},
"collection": {
"id":null
},
"subscription": {
"onData":null,
"onError":null,
"onComplete":null
}
}`

Expand Down Expand Up @@ -147,52 +161,54 @@ const prepAccountKeyId = acct => {
}
}

export const prepAccount = (acct, opts = {}) => ix => {
invariant(
typeof acct === "function" || typeof acct === "object",
"prepAccount must be passed an authorization function or an account object"
)
invariant(opts.role != null, "Account must have a role")

const ACCOUNT = JSON.parse(ACCT)
const role = opts.role
const tempId = uuidv4()

if (acct.authorization && isFn(acct.authorization))
acct = {resolve: acct.authorization}
if (!acct.authorization && isFn(acct)) acct = {resolve: acct}

const resolve = acct.resolve
if (resolve)
acct.resolve = (acct, ...rest) =>
[resolve, prepAccountKeyId].reduce(
async (d, fn) => fn(await d, ...rest),
acct
)
acct = prepAccountKeyId(acct)

ix.accounts[tempId] = {
...ACCOUNT,
tempId,
...acct,
role: {
...ACCOUNT.role,
...(typeof acct.role === "object" ? acct.role : {}),
[role]: true,
},
}

if (role === AUTHORIZER) {
ix.authorizations.push(tempId)
} else if (role === PAYER) {
ix.payer.push(tempId)
} else {
ix[role] = tempId
export const prepAccount =
(acct, opts = {}) =>
ix => {
invariant(
typeof acct === "function" || typeof acct === "object",
"prepAccount must be passed an authorization function or an account object"
)
invariant(opts.role != null, "Account must have a role")

const ACCOUNT = JSON.parse(ACCT)
const role = opts.role
const tempId = uuidv4()

if (acct.authorization && isFn(acct.authorization))
acct = {resolve: acct.authorization}
if (!acct.authorization && isFn(acct)) acct = {resolve: acct}

const resolve = acct.resolve
if (resolve)
acct.resolve = (acct, ...rest) =>
[resolve, prepAccountKeyId].reduce(
async (d, fn) => fn(await d, ...rest),
acct
)
acct = prepAccountKeyId(acct)

ix.accounts[tempId] = {
...ACCOUNT,
tempId,
...acct,
role: {
...ACCOUNT.role,
...(typeof acct.role === "object" ? acct.role : {}),
[role]: true,
},
}

if (role === AUTHORIZER) {
ix.authorizations.push(tempId)
} else if (role === PAYER) {
ix.payer.push(tempId)
} else {
ix[role] = tempId
}

return ix
}

return ix
}

export const makeArgument = arg => ix => {
let tempId = uuidv4()
ix.message.arguments.push(tempId)
Expand All @@ -217,6 +233,7 @@ export const makeGetTransactionStatus /* */ = makeIx(GET_TRANSACTION_STATUS)
export const makeGetTransaction /* */ = makeIx(GET_TRANSACTION)
export const makeGetAccount /* */ = makeIx(GET_ACCOUNT)
export const makeGetEvents /* */ = makeIx(GET_EVENTS)
export const makeSubscribeEvents /* */ = makeIx(SUBSCRIBE_EVENTS)
export const makePing /* */ = makeIx(PING)
export const makeGetBlock /* */ = makeIx(GET_BLOCK)
export const makeGetBlockHeader /* */ = makeIx(GET_BLOCK_HEADER)
Expand All @@ -232,6 +249,7 @@ export const isGetTransactionStatus /* */ = is(GET_TRANSACTION_STATUS)
export const isGetTransaction /* */ = is(GET_TRANSACTION)
export const isGetAccount /* */ = is(GET_ACCOUNT)
export const isGetEvents /* */ = is(GET_EVENTS)
export const isSubscribeEvents /* */ = is(SUBSCRIBE_EVENTS)
export const isPing /* */ = is(PING)
export const isGetBlock /* */ = is(GET_BLOCK)
export const isGetBlockHeader /* */ = is(GET_BLOCK_HEADER)
Expand Down Expand Up @@ -287,10 +305,12 @@ export const put = (key, value) => ix => {
return Ok(ix)
}

export const update = (key, fn = identity) => ix => {
ix.assigns[key] = fn(ix.assigns[key], ix)
return Ok(ix)
}
export const update =
(key, fn = identity) =>
ix => {
ix.assigns[key] = fn(ix.assigns[key], ix)
return Ok(ix)
}

export const destroy = key => ix => {
delete ix.assigns[key]
Expand Down
3 changes: 2 additions & 1 deletion packages/sdk/src/response/response.js
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ const DEFAULT_RESPONSE = `{
"blockHeader":null,
"latestBlock":null,
"collection":null,
"networkParameters":null
"networkParameters":null,
"unsubscribeCallback":null,
}`

export const response = () => JSON.parse(DEFAULT_RESPONSE)
9 changes: 1 addition & 8 deletions packages/transport-http/src/http-request.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as logger from "@onflow/util-logger"
import fetchTransport from "cross-fetch"
import {safeParseJSON} from "./utils"

const AbortController =
globalThis.AbortController || require("abort-controller")
Expand Down Expand Up @@ -161,11 +162,3 @@ export async function httpRequest({
// Keep retrying request until server available or max attempts exceeded
return await requestLoop()
}

function safeParseJSON(data) {
try {
return JSON.parse(data)
} catch {
return null
}
}
3 changes: 3 additions & 0 deletions packages/transport-http/src/send-http.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {sendGetTransaction} from "./send-get-transaction.js"
import {sendExecuteScript} from "./send-execute-script.js"
import {sendGetAccount} from "./send-get-account.js"
import {sendGetEvents} from "./send-get-events.js"
import {sendSubscribeEvents} from "./send-subscribe-events.js"
import {sendGetBlock} from "./send-get-block.js"
import {sendGetBlockHeader} from "./send-get-block-header.js"
import {sendGetCollection} from "./send-get-collection.js"
Expand Down Expand Up @@ -34,6 +35,8 @@ export const send = async (ix, context = {}, opts = {}) => {
return opts.sendGetAccount ? opts.sendGetAccount(ix, context, opts) : sendGetAccount(ix, context, opts)
case context.ix.isGetEvents(ix):
return opts.sendGetEvents ? opts.sendGetEvents(ix, context, opts) : sendGetEvents(ix, context, opts)
case context.ix.isSubscribeEvents(ix):
return opts.sendSubscribeEvents ? opts.sendSubscribeEvents(ix, context, opts) : sendSubscribeEvents(ix, context, opts)
case context.ix.isGetBlock(ix):
return opts.sendGetBlock ? opts.sendGetBlock(ix, context, opts) : sendGetBlock(ix, context, opts)
case context.ix.isGetBlockHeader(ix):
Expand Down
73 changes: 73 additions & 0 deletions packages/transport-http/src/send-subscribe-events.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import {invariant} from "@onflow/util-invariant"
import {subscribeWs as defaultSubscribeWs} from "./subscribe-ws"

function constructData(ix, context, data) {
let ret = context.response()
ret.tag = ix.tag

ret.events = data.events
? data.events.map(event => ({
blockId: data.BlockId,
blockHeight: Number(data.Height),
blockTimestamp: data.Timestamp,
type: event.Type,
transactionId: event.TransactionId,
transactionIndex: Number(event.TransactionIndex),
eventIndex: Number(event.EventIndex),
payload: JSON.parse(
context.Buffer.from(event.Payload, "base64").toString()
),
}))
: []

return ret
}

function constructResponse(ix, context, callback) {
let ret = context.response()
ret.tag = ix.tag

ret.unsubscribeCallback = callback

return ret
}

export async function sendSubscribeEvents(ix, context = {}, opts = {}) {
invariant(opts.node, `SDK Send Get Events Error: opts.node must be defined.`)
invariant(
context.response,
`SDK Send Get Events Error: context.response must be defined.`
)
invariant(
context.Buffer,
`SDK Send Get Events Error: context.Buffer must be defined.`
)

ix = await ix

const subscribeWs = opts.subscribeWs || defaultSubscribeWs
const {onData, onError, onComplete} = ix.subscription

const unsubscribe = subscribeWs({
hostname: opts.node,
path: `/v1/subscribe_events`,
params: {
start_block_id: ix.subscribeEvents.startBlockId,
start_height: ix.subscribeEvents.startHeight,
event_types: ix.subscribeEvents.eventTypes,
addresses: ix.subscribeEvents.addresses,
contracts: ix.subscribeEvents.contracts,
heartbeat_interval: ix.subscribeEvents.heartbeatInterval,
},
onData: handleData,
onError,
onComplete,
})

function handleData(data) {
const resp = constructData(ix, context, data)
onData(resp)
}

return constructResponse(ix, context, unsubscribe)
}
Loading

0 comments on commit f8c4a60

Please sign in to comment.