From c33ed209a14fed968e6a5f421c00b2225073e3dd Mon Sep 17 00:00:00 2001 From: farhanW3 <132962163+farhanW3@users.noreply.github.com> Date: Fri, 24 Nov 2023 20:08:39 +0530 Subject: [PATCH] Webhook Updates + On-Chain Tx Status Add (#314) * added new column onChainTxStatus based on receipt status. Solved webhook multisend using locks * updates example * updated UserOp Tx Receipt update to use prisma Transactions too * updated userOp tx rceipt check. Updated getTxById to not use locking when not needed * updated load test to ignore TLS/SSL when using https for testing * updated SDK to latest to get MetaTransaction Updates * updated return type for raw query in Transactions tbl * webhooks updates for multi-serer setup. UserOp Status data added * moved sendWebhookForQueueIds post update statement --- package.json | 2 +- src/db/transactions/getQueuedTxs.ts | 36 ++--- src/db/transactions/getSentTxs.ts | 39 ++--- src/db/transactions/getSentUserOps.ts | 41 ++---- src/db/transactions/getTxById.ts | 2 +- src/db/transactions/getTxByIds.ts | 30 ++++ src/db/transactions/updateTx.ts | 12 ++ .../migration.sql | 2 + src/prisma/schema.prisma | 1 + src/server/routes/transaction/status.ts | 1 + src/server/schemas/transaction/index.ts | 1 + src/server/utils/webhook.ts | 118 ++++++++------- src/worker/listeners/queuedTxListener.ts | 3 - src/worker/listeners/updateTxListener.ts | 4 - src/worker/tasks/processTx.ts | 13 +- src/worker/tasks/updateMinedTx.ts | 126 +++++++++------- src/worker/tasks/updateMinedUserOps.ts | 137 +++++++++++------- test/load/index.ts | 2 + yarn.lock | 65 ++++++++- 19 files changed, 384 insertions(+), 251 deletions(-) create mode 100644 src/db/transactions/getTxByIds.ts create mode 100644 src/prisma/migrations/20231123064817_added_onchain_status_flag_to_tx/migration.sql diff --git a/package.json b/package.json index bf980f439..004cefa11 100644 --- a/package.json +++ b/package.json @@ -43,7 +43,7 @@ "@t3-oss/env-core": "^0.6.0", "@thirdweb-dev/auth": "^4.1.0-nightly-c238fde8-20231020022304", "@thirdweb-dev/chains": "^0.1.55", - "@thirdweb-dev/sdk": "^4.0.13", + "@thirdweb-dev/sdk": "^4.0.17", "@thirdweb-dev/service-utils": "^0.4.2", "@thirdweb-dev/wallets": "^2.1.5", "body-parser": "^1.20.2", diff --git a/src/db/transactions/getQueuedTxs.ts b/src/db/transactions/getQueuedTxs.ts index 39f823cd6..2a6b065ee 100644 --- a/src/db/transactions/getQueuedTxs.ts +++ b/src/db/transactions/getQueuedTxs.ts @@ -1,3 +1,4 @@ +import { Transactions } from "@prisma/client"; import { Static } from "@sinclair/typebox"; import { PrismaTransaction } from "../../schema/prisma"; import { transactionResponseSchema } from "../../server/schemas/transaction"; @@ -16,24 +17,23 @@ export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise< const config = await getConfiguration(); // TODO: Don't use env var for transactions to batch - const txs = await prisma.$queryRaw` -SELECT - * -FROM - "transactions" -WHERE - "processedAt" IS NULL - AND "sentAt" IS NULL - AND "minedAt" IS NULL - AND "cancelledAt" IS NULL -ORDER BY - "queuedAt" -ASC -LIMIT - ${config.maxTxsToProcess} -FOR UPDATE SKIP LOCKED + const txs = await prisma.$queryRaw` + SELECT + * + FROM + "transactions" + WHERE + "processedAt" IS NULL + AND "sentAt" IS NULL + AND "minedAt" IS NULL + AND "cancelledAt" IS NULL + ORDER BY + "queuedAt" + ASC + LIMIT + ${config.maxTxsToProcess} + FOR UPDATE SKIP LOCKED `; - // TODO: This might not work! - return cleanTxs(txs as any); + return cleanTxs(txs); }; diff --git a/src/db/transactions/getSentTxs.ts b/src/db/transactions/getSentTxs.ts index 7f4647934..b1a0f10c8 100644 --- a/src/db/transactions/getSentTxs.ts +++ b/src/db/transactions/getSentTxs.ts @@ -13,30 +13,17 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise< const prisma = getPrismaWithPostgresTx(pgtx); const config = await getConfiguration(); - return prisma.transactions.findMany({ - where: { - processedAt: { - not: null, - }, - sentAt: { - not: null, - }, - transactionHash: { - not: null, - }, - accountAddress: null, - minedAt: null, - errorMessage: null, - retryCount: { - // TODO: What should the max retries be here? - lt: 3, - }, - }, - orderBy: [ - { - sentAt: "asc", - }, - ], - take: config.maxTxsToUpdate, - }); + return prisma.$queryRaw` + SELECT * FROM "transactions" + WHERE "processedAt" IS NOT NULL + AND "sentAt" IS NOT NULL + AND "transactionHash" IS NOT NULL + AND "accountAddress" IS NULL + AND "minedAt" IS NULL + AND "errorMessage" IS NULL + AND "retryCount" < ${config.maxTxsToUpdate} + ORDER BY "sentAt" ASC + LIMIT ${config.maxTxsToUpdate} + FOR UPDATE SKIP LOCKED + `; }; diff --git a/src/db/transactions/getSentUserOps.ts b/src/db/transactions/getSentUserOps.ts index f08bfa9c8..11a12208d 100644 --- a/src/db/transactions/getSentUserOps.ts +++ b/src/db/transactions/getSentUserOps.ts @@ -13,32 +13,17 @@ export const getSentUserOps = async ({ const prisma = getPrismaWithPostgresTx(pgtx); const config = await getConfiguration(); - return prisma.transactions.findMany({ - where: { - processedAt: { - not: null, - }, - sentAt: { - not: null, - }, - accountAddress: { - not: null, - }, - userOpHash: { - not: null, - }, - minedAt: null, - errorMessage: null, - retryCount: { - // TODO: What should the max retries be here? - lt: 3, - }, - }, - orderBy: [ - { - sentAt: "asc", - }, - ], - take: config.maxTxsToUpdate, - }); + return prisma.$queryRaw` + SELECT * FROM "transactions" + WHERE "processedAt" IS NOT NULL + AND "sentAt" IS NOT NULL + AND "accountAddress" IS NOT NULL + AND "userOpHash" IS NOT NULL + AND "minedAt" IS NULL + AND "errorMessage" IS NULL + AND "retryCount" < 3 + ORDER BY "sentAt" ASC + LIMIT ${config.maxTxsToUpdate} + FOR UPDATE SKIP LOCKED; + `; }; diff --git a/src/db/transactions/getTxById.ts b/src/db/transactions/getTxById.ts index 721aa1bf0..d434a7a54 100644 --- a/src/db/transactions/getTxById.ts +++ b/src/db/transactions/getTxById.ts @@ -16,12 +16,12 @@ export const getTxById = async ({ typeof transactionResponseSchema > | null> => { const prisma = getPrismaWithPostgresTx(pgtx); - const tx = await prisma.transactions.findUnique({ where: { id: queueId, }, }); + if (!tx) { return null; } diff --git a/src/db/transactions/getTxByIds.ts b/src/db/transactions/getTxByIds.ts new file mode 100644 index 000000000..5ef87a9e2 --- /dev/null +++ b/src/db/transactions/getTxByIds.ts @@ -0,0 +1,30 @@ +import { Static } from "@sinclair/typebox"; +import { PrismaTransaction } from "../../schema/prisma"; +import { transactionResponseSchema } from "../../server/schemas/transaction"; +import { prisma } from "../client"; +import { cleanTxs } from "./cleanTxs"; +interface GetTxByIdsParams { + queueIds: string[]; + pgtx?: PrismaTransaction; +} + +export const getTxByIds = async ({ + queueIds, +}: GetTxByIdsParams): Promise< + Static[] | null +> => { + const tx = await prisma.transactions.findMany({ + where: { + id: { + in: queueIds, + }, + }, + }); + + if (!tx || tx.length === 0) { + return null; + } + + const cleanedTx = cleanTxs(tx); + return cleanedTx; +}; diff --git a/src/db/transactions/updateTx.ts b/src/db/transactions/updateTx.ts index a83423ad4..76a5e3da9 100644 --- a/src/db/transactions/updateTx.ts +++ b/src/db/transactions/updateTx.ts @@ -35,6 +35,12 @@ type UpdateTxData = gasPrice?: string; blockNumber?: number; minedAt: Date; + onChainTxStatus?: number; + transactionHash?: string; + transactionType?: number; + gasLimit?: string; + maxFeePerGas?: string; + maxPriorityFeePerGas?: string; }; export const updateTx = async ({ pgtx, queueId, data }: UpdateTxParams) => { @@ -109,6 +115,12 @@ export const updateTx = async ({ pgtx, queueId, data }: UpdateTxParams) => { minedAt: data.minedAt, gasPrice: data.gasPrice, blockNumber: data.blockNumber, + onChainTxStatus: data.onChainTxStatus, + transactionHash: data.transactionHash, + transactionType: data.transactionType, + gasLimit: data.gasLimit, + maxFeePerGas: data.maxFeePerGas, + maxPriorityFeePerGas: data.maxPriorityFeePerGas, }, }); break; diff --git a/src/prisma/migrations/20231123064817_added_onchain_status_flag_to_tx/migration.sql b/src/prisma/migrations/20231123064817_added_onchain_status_flag_to_tx/migration.sql new file mode 100644 index 000000000..d1954b6d2 --- /dev/null +++ b/src/prisma/migrations/20231123064817_added_onchain_status_flag_to_tx/migration.sql @@ -0,0 +1,2 @@ +-- AlterTable +ALTER TABLE "transactions" ADD COLUMN "onChainTxStatus" INTEGER; diff --git a/src/prisma/schema.prisma b/src/prisma/schema.prisma index 254666113..892823593 100644 --- a/src/prisma/schema.prisma +++ b/src/prisma/schema.prisma @@ -110,6 +110,7 @@ model Transactions { gasPrice String? @map("gasPrice") transactionType Int? @map("transactionType") transactionHash String? @map("transactionHash") + onChainTxStatus Int? @map("onChainTxStatus") // User Operation signerAddress String? @map("signerAddress") accountAddress String? @map("accountAddress") diff --git a/src/server/routes/transaction/status.ts b/src/server/routes/transaction/status.ts index e6ddb2078..3ce247f10 100644 --- a/src/server/routes/transaction/status.ts +++ b/src/server/routes/transaction/status.ts @@ -54,6 +54,7 @@ responseBodySchema.example = { errorMessage: "", txMinedTimestamp: "2023-08-25T22:42:33.000Z", blockNumber: 39398545, + onChainTxStatus: 1, }, }; diff --git a/src/server/schemas/transaction/index.ts b/src/server/schemas/transaction/index.ts index d2965ee53..819e64613 100644 --- a/src/server/schemas/transaction/index.ts +++ b/src/server/schemas/transaction/index.ts @@ -187,6 +187,7 @@ export const transactionResponseSchema = Type.Object({ userOpHash: Type.Union([Type.String(), Type.Null()]), functionName: Type.Union([Type.String(), Type.Null()]), functionArgs: Type.Union([Type.String(), Type.Null()]), + onChainTxStatus: Type.Union([Type.Number(), Type.Null()]), }); export enum TransactionStatusEnum { diff --git a/src/server/utils/webhook.ts b/src/server/utils/webhook.ts index 74a84a243..81cf46cdd 100644 --- a/src/server/utils/webhook.ts +++ b/src/server/utils/webhook.ts @@ -1,6 +1,5 @@ import crypto from "crypto"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; -import { getTxById } from "../../db/transactions/getTxById"; +import { getTxByIds } from "../../db/transactions/getTxByIds"; import { SanitizedWebHooksSchema, WalletBalanceWebhookSchema, @@ -12,10 +11,6 @@ import { TransactionStatusEnum } from "../schemas/transaction"; let balanceNotificationLastSentAt = -1; -interface TxWebookParams { - id: string; -} - export const generateSignature = ( body: Record, timestamp: string, @@ -74,65 +69,68 @@ export const sendWebhookRequest = async ( return true; }; -export const sendTxWebhook = async (data: TxWebookParams): Promise => { +export const sendTxWebhook = async (queueIds: string[]): Promise => { try { - const txData = await getTxById({ queueId: data.id }); - if (!txData) { - throw new Error(`Transaction ${data.id} not found.`); - } - - let webhookConfig: SanitizedWebHooksSchema[] | undefined = - await getWebhookConfig(WebhooksEventTypes.ALL_TX); - - // For Backwards Compatibility - const config = await getConfiguration(); - if (config?.webhookUrl && config?.webhookAuthBearerToken) { - const newFormatWebhookData = { - id: 0, - url: config.webhookUrl, - secret: config.webhookAuthBearerToken, - active: true, - eventType: WebhooksEventTypes.ALL_TX, - createdAt: new Date().toISOString(), - name: "Legacy Webhook", - }; - await sendWebhookRequest(newFormatWebhookData, txData); + const txDataByIds = await getTxByIds({ queueIds }); + if (!txDataByIds || txDataByIds.length === 0) { return; } - - if (!webhookConfig) { - switch (txData.status) { - case TransactionStatusEnum.Queued: - webhookConfig = await getWebhookConfig(WebhooksEventTypes.QUEUED_TX); - break; - case TransactionStatusEnum.Submitted: - webhookConfig = await getWebhookConfig(WebhooksEventTypes.SENT_TX); - break; - case TransactionStatusEnum.Retried: - webhookConfig = await getWebhookConfig(WebhooksEventTypes.RETRIED_TX); - break; - case TransactionStatusEnum.Mined: - webhookConfig = await getWebhookConfig(WebhooksEventTypes.MINED_TX); - break; - case TransactionStatusEnum.Errored: - webhookConfig = await getWebhookConfig(WebhooksEventTypes.ERRORED_TX); - break; - case TransactionStatusEnum.Cancelled: - webhookConfig = await getWebhookConfig(WebhooksEventTypes.ERRORED_TX); - break; - } - } - - webhookConfig?.map(async (config) => { - if (!config || !config?.active) { - logger.server.debug("No Webhook Set or Active, skipping webhook send"); + for (const txData of txDataByIds!) { + if (!txData) { return; + } else { + let webhookConfig: SanitizedWebHooksSchema[] | undefined = + await getWebhookConfig(WebhooksEventTypes.ALL_TX); + + if (!webhookConfig) { + switch (txData.status) { + case TransactionStatusEnum.Queued: + webhookConfig = await getWebhookConfig( + WebhooksEventTypes.QUEUED_TX, + ); + break; + case TransactionStatusEnum.Submitted: + webhookConfig = await getWebhookConfig( + WebhooksEventTypes.SENT_TX, + ); + break; + case TransactionStatusEnum.Retried: + webhookConfig = await getWebhookConfig( + WebhooksEventTypes.RETRIED_TX, + ); + break; + case TransactionStatusEnum.Mined: + webhookConfig = await getWebhookConfig( + WebhooksEventTypes.MINED_TX, + ); + break; + case TransactionStatusEnum.Errored: + webhookConfig = await getWebhookConfig( + WebhooksEventTypes.ERRORED_TX, + ); + break; + case TransactionStatusEnum.Cancelled: + webhookConfig = await getWebhookConfig( + WebhooksEventTypes.ERRORED_TX, + ); + break; + } + } + + webhookConfig?.map(async (config) => { + if (!config || !config?.active) { + logger.server.debug( + "No Webhook Set or Active, skipping webhook send", + ); + return; + } + + await sendWebhookRequest(config, txData); + }); } - - await sendWebhookRequest(config, txData); - }); + } } catch (error) { - logger.server.error(`[sendWebhook] error: ${error}`); + logger.server.error(error); } }; @@ -171,6 +169,6 @@ export const sendBalanceWebhook = async ( } }); } catch (error) { - logger.server.error(`[sendWebhook] error: ${error}`); + logger.server.error(error); } }; diff --git a/src/worker/listeners/queuedTxListener.ts b/src/worker/listeners/queuedTxListener.ts index c0426e1ef..eebd86dbc 100644 --- a/src/worker/listeners/queuedTxListener.ts +++ b/src/worker/listeners/queuedTxListener.ts @@ -1,6 +1,5 @@ import PQueue from "p-queue"; import { knex } from "../../db/client"; -import { sendTxWebhook } from "../../server/utils/webhook"; import { logger } from "../../utils/logger"; import { processTx } from "../tasks/processTx"; @@ -30,8 +29,6 @@ export const queuedTxListener = async (): Promise => { connection.on( "notification", async (msg: { channel: string; payload: string }) => { - const parsedPayload = JSON.parse(msg.payload); - await sendTxWebhook(parsedPayload); queue.add(processTx); }, ); diff --git a/src/worker/listeners/updateTxListener.ts b/src/worker/listeners/updateTxListener.ts index 01492ab5e..72ae5c341 100644 --- a/src/worker/listeners/updateTxListener.ts +++ b/src/worker/listeners/updateTxListener.ts @@ -1,7 +1,6 @@ import { knex } from "../../db/client"; import { getTxById } from "../../db/transactions/getTxById"; import { subscriptionsData } from "../../server/schemas/websocket"; -import { sendTxWebhook } from "../../server/utils/webhook"; import { formatSocketMessage, getStatusMessageAndConnectionStatus, @@ -19,9 +18,6 @@ export const updateTxListener = async (): Promise => { async (msg: { channel: string; payload: string }) => { const parsedPayload = JSON.parse(msg.payload); - // Send webhook - await sendTxWebhook(parsedPayload); - // Send websocket message const index = subscriptionsData.findIndex( (sub) => sub.requestId === parsedPayload.identifier, diff --git a/src/worker/tasks/processTx.ts b/src/worker/tasks/processTx.ts index 149952874..ee217c4ff 100644 --- a/src/worker/tasks/processTx.ts +++ b/src/worker/tasks/processTx.ts @@ -18,7 +18,7 @@ import { TransactionStatusEnum, transactionResponseSchema, } from "../../server/schemas/transaction"; -import { sendBalanceWebhook } from "../../server/utils/webhook"; +import { sendBalanceWebhook, sendTxWebhook } from "../../server/utils/webhook"; import { getSdk } from "../../utils/cache/getSdk"; import { env } from "../../utils/env"; import { logger } from "../../utils/logger"; @@ -39,6 +39,8 @@ type SentTxStatus = export const processTx = async () => { try { + // 0. Initialize queueIds to send webhook + const sendWebhookForQueueIds: string[] = []; await prisma.$transaction( async (pgtx) => { // 1. Select a batch of transactions and lock the rows so no other workers pick them up @@ -48,6 +50,8 @@ export const processTx = async () => { if (txs.length < config.minTxsToProcess) { return; } + // Send Queued Webhook + await sendTxWebhook(txs.map((tx) => tx.queueId!)); // 2. Iterate through all filtering cancelled trandsactions, and sorting transactions and user operations const txsToSend = []; @@ -59,7 +63,6 @@ export const processTx = async () => { ); continue; } - logger.worker.info( `[Transaction] [${tx.queueId}] Picked up by worker`, ); @@ -287,6 +290,7 @@ export const processTx = async () => { }); break; } + sendWebhookForQueueIds.push(tx.queueId!); }), ); }); @@ -324,11 +328,11 @@ export const processTx = async () => { userOpHash, }, }); + sendWebhookForQueueIds.push(tx.queueId!); } catch (err: any) { logger.worker.warn( `[User Operation] [${tx.queueId}] Failed to send with error - ${err}`, ); - await updateTx({ pgtx, queueId: tx.queueId!, @@ -340,6 +344,7 @@ export const processTx = async () => { `Failed to handle transaction`, }, }); + sendWebhookForQueueIds.push(tx.queueId!); } }); @@ -351,6 +356,8 @@ export const processTx = async () => { timeout: 5 * 60000, }, ); + + await sendTxWebhook(sendWebhookForQueueIds); } catch (err: any) { logger.worker.error( `Failed to process batch of transactions with error - ${err}`, diff --git a/src/worker/tasks/updateMinedTx.ts b/src/worker/tasks/updateMinedTx.ts index ca6905535..0c2521425 100644 --- a/src/worker/tasks/updateMinedTx.ts +++ b/src/worker/tasks/updateMinedTx.ts @@ -1,78 +1,94 @@ import { Transactions } from "@prisma/client"; import { getBlock } from "@thirdweb-dev/sdk"; import { ethers } from "ethers"; +import { prisma } from "../../db/client"; import { getSentTxs } from "../../db/transactions/getSentTxs"; import { updateTx } from "../../db/transactions/updateTx"; import { TransactionStatusEnum } from "../../server/schemas/transaction"; +import { sendTxWebhook } from "../../server/utils/webhook"; import { getSdk } from "../../utils/cache/getSdk"; import { logger } from "../../utils/logger"; export const updateMinedTx = async () => { try { - const txs = await getSentTxs(); + const sendWebhookForQueueIds: string[] = []; + await prisma.$transaction( + async (pgtx) => { + const txs = await getSentTxs({ pgtx }); - if (txs.length === 0) { - return; - } + if (txs.length === 0) { + return; + } - const txsWithReceipts = ( - await Promise.all( - txs.map(async (tx) => { - const sdk = await getSdk({ chainId: parseInt(tx.chainId!) }); - const receipt: ethers.providers.TransactionReceipt | undefined = - await sdk.getProvider().getTransactionReceipt(tx.transactionHash!); + const txsWithReceipts = ( + await Promise.all( + txs.map(async (tx) => { + const sdk = await getSdk({ chainId: parseInt(tx.chainId!) }); + const receipt: ethers.providers.TransactionReceipt | undefined = + await sdk + .getProvider() + .getTransactionReceipt(tx.transactionHash!); - if (!receipt) { - // If no receipt was received, return undefined to filter out tx - return undefined; - } + if (!receipt) { + // If no receipt was received, return undefined to filter out tx + return undefined; + } - // Get the timestamp when the transaction was mined - const minedAt = new Date( - ( - await getBlock({ - block: receipt.blockNumber, - network: sdk.getProvider(), - }) - ).timestamp * 1000, - ); + // Get the timestamp when the transaction was mined + const minedAt = new Date( + ( + await getBlock({ + block: receipt.blockNumber, + network: sdk.getProvider(), + }) + ).timestamp * 1000, + ); - return { - tx, - receipt, - minedAt, - }; - }), - ) - ).filter((txWithReceipt) => { - // Filter out transactions with no receipt to be picked up by a future worker - return !!txWithReceipt; - }) as { - tx: Transactions; - receipt: ethers.providers.TransactionReceipt; - minedAt: Date; - }[]; + return { + tx, + receipt, + minedAt, + }; + }), + ) + ).filter((txWithReceipt) => { + // Filter out transactions with no receipt to be picked up by a future worker + return !!txWithReceipt; + }) as { + tx: Transactions; + receipt: ethers.providers.TransactionReceipt; + minedAt: Date; + }[]; - // Update all transactions with a receipt in parallel - await Promise.all( - txsWithReceipts.map(async (txWithReceipt) => { - await updateTx({ - queueId: txWithReceipt.tx.id, - data: { - status: TransactionStatusEnum.Mined, - gasPrice: txWithReceipt.receipt.effectiveGasPrice.toString(), - blockNumber: txWithReceipt.receipt.blockNumber, - minedAt: txWithReceipt.minedAt, - }, - }); - - logger.worker.info( - `[Transaction] [${txWithReceipt.tx.id}] Updated with receipt`, + // Update all transactions with a receipt in parallel + await Promise.all( + txsWithReceipts.map(async (txWithReceipt) => { + await updateTx({ + pgtx, + queueId: txWithReceipt.tx.id, + data: { + status: TransactionStatusEnum.Mined, + gasPrice: txWithReceipt.receipt.effectiveGasPrice.toString(), + blockNumber: txWithReceipt.receipt.blockNumber, + minedAt: txWithReceipt.minedAt, + onChainTxStatus: txWithReceipt.receipt.status, + }, + }); + logger.worker.info( + `[Transaction] [${txWithReceipt.tx.id}] Updated with receipt`, + ); + sendWebhookForQueueIds.push(txWithReceipt.tx.id); + }), ); - }), + }, + { + timeout: 5 * 60000, + }, ); + + await sendTxWebhook(sendWebhookForQueueIds); } catch (err) { - logger.worker.error(`Failed to update receipts with error - ${err}`); + logger.worker.error(err); return; } }; diff --git a/src/worker/tasks/updateMinedUserOps.ts b/src/worker/tasks/updateMinedUserOps.ts index 8f651b933..eb0c85e5f 100644 --- a/src/worker/tasks/updateMinedUserOps.ts +++ b/src/worker/tasks/updateMinedUserOps.ts @@ -1,72 +1,109 @@ import { getBlock } from "@thirdweb-dev/sdk"; import { ERC4337EthersSigner } from "@thirdweb-dev/wallets/dist/declarations/src/evm/connectors/smart-wallet/lib/erc4337-signer"; +import { prisma } from "../../db/client"; import { getSentUserOps } from "../../db/transactions/getSentUserOps"; import { updateTx } from "../../db/transactions/updateTx"; import { TransactionStatusEnum } from "../../server/schemas/transaction"; +import { sendTxWebhook } from "../../server/utils/webhook"; import { getSdk } from "../../utils/cache/getSdk"; import { logger } from "../../utils/logger"; export const updateMinedUserOps = async () => { try { - const userOps = await getSentUserOps(); + const sendWebhookForQueueIds: string[] = []; + await prisma.$transaction( + async (pgtx) => { + const userOps = await getSentUserOps({ pgtx }); - if (userOps.length === 0) { - return; - } + if (userOps.length === 0) { + return; + } - // TODO: Improve spaghetti code... - const updatedUserOps = ( - await Promise.all( - userOps.map(async (userOp) => { - const sdk = await getSdk({ - chainId: parseInt(userOp.chainId!), - walletAddress: userOp.signerAddress!, - accountAddress: userOp.accountAddress!, - }); + // TODO: Improve spaghetti code... + const updatedUserOps = ( + await Promise.all( + userOps.map(async (userOp) => { + const sdk = await getSdk({ + chainId: parseInt(userOp.chainId!), + walletAddress: userOp.signerAddress!, + accountAddress: userOp.accountAddress!, + }); - const signer = sdk.getSigner() as ERC4337EthersSigner; - const txHash = await signer.smartAccountAPI.getUserOpReceipt( - userOp.userOpHash!, - 3000, - ); + const signer = sdk.getSigner() as ERC4337EthersSigner; - if (!txHash) { - // If no receipt was received, return undefined to filter out tx - return undefined; - } + const txHash = await signer.smartAccountAPI.getUserOpReceipt( + userOp.userOpHash!, + 3000, + ); - const tx = await signer.provider!.getTransaction(txHash); - const minedAt = new Date( - ( - await getBlock({ - block: tx.blockNumber!, - network: sdk.getProvider(), - }) - ).timestamp * 1000, - ); + if (!txHash) { + // If no receipt was received, return undefined to filter out tx + return undefined; + } + const _sdk = await getSdk({ + chainId: parseInt(userOp.chainId!), + }); - return { - ...userOp, - blockNumber: tx.blockNumber!, - minedAt, - }; - }), - ) - ).filter((userOp) => !!userOp); + const tx = await signer.provider!.getTransaction(txHash); + const txReceipt = await _sdk + .getProvider() + .getTransactionReceipt(txHash); + const minedAt = new Date( + ( + await getBlock({ + block: tx.blockNumber!, + network: sdk.getProvider(), + }) + ).timestamp * 1000, + ); - await Promise.all( - updatedUserOps.map(async (userOp) => { - await updateTx({ - queueId: userOp!.id, - data: { - status: TransactionStatusEnum.Mined, - minedAt: userOp!.minedAt, - }, - }); + return { + ...userOp, + blockNumber: tx.blockNumber!, + minedAt, + onChainTxStatus: txReceipt.status, + transactionHash: txHash, + transactionType: tx.type, + gasLimit: tx.gasLimit.toString(), + maxFeePerGas: tx.maxFeePerGas?.toString(), + maxPriorityFeePerGas: tx.maxPriorityFeePerGas?.toString(), + }; + }), + ) + ).filter((userOp) => !!userOp); - logger.worker.info(`[User Op] [${userOp!.id}] Updated with receipt`); - }), + await Promise.all( + updatedUserOps.map(async (userOp) => { + await updateTx({ + pgtx, + queueId: userOp!.id, + data: { + status: TransactionStatusEnum.Mined, + minedAt: userOp!.minedAt, + blockNumber: userOp!.blockNumber, + onChainTxStatus: userOp!.onChainTxStatus, + transactionHash: userOp!.transactionHash, + transactionType: userOp!.transactionType || undefined, + gasLimit: userOp!.gasLimit || undefined, + maxFeePerGas: userOp!.maxFeePerGas || undefined, + maxPriorityFeePerGas: userOp!.maxPriorityFeePerGas || undefined, + gasPrice: userOp!.gasPrice || undefined, + }, + }); + + logger.worker.info( + `[User Op] [${userOp!.id}] Updated with receipt`, + ); + sendWebhookForQueueIds.push(userOp!.id); + }), + ); + }, + { + timeout: 5 * 60000, + }, ); + + await sendTxWebhook(sendWebhookForQueueIds); } catch (err) { logger.worker.error(`Failed to update receipts with error - ${err}`); return; diff --git a/test/load/index.ts b/test/load/index.ts index e7532da9f..11e8144b2 100644 --- a/test/load/index.ts +++ b/test/load/index.ts @@ -6,6 +6,8 @@ import { z } from "zod"; import { fetchEngine } from "./utils/fetch"; import { createTimer, sleep } from "./utils/time"; +process.env.NODE_TLS_REJECT_UNAUTHORIZED = "0"; + const OptionsSchema = z.object({ host: z.string().optional(), path: z.string().optional(), diff --git a/yarn.lock b/yarn.lock index ac268b28d..8e151c485 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1687,7 +1687,7 @@ resolved "https://registry.yarnpkg.com/@noble/hashes/-/hashes-1.3.1.tgz#8831ef002114670c603c458ab8b11328406953a9" integrity sha512-EbqwksQwz9xDRGfDST86whPBgM65E0OH/pCgqW0GBVzO22bNE+NuIbeTb714+IfSjU3aRk47EUvXIb5bTsenKA== -"@noble/hashes@1.3.2", "@noble/hashes@~1.3.0", "@noble/hashes@~1.3.2": +"@noble/hashes@1.3.2", "@noble/hashes@^1.3.2", "@noble/hashes@~1.3.0", "@noble/hashes@~1.3.2": version "1.3.2" resolved "https://registry.yarnpkg.com/@noble/hashes/-/hashes-1.3.2.tgz#6f26dbc8fbc7205873ce3cee2f690eba0d421b39" integrity sha512-MVC8EAQp7MvEcm30KWENFjgR+Mkmf+D189XJTkFIlwohU5hcBbn1ZkKq7KVTi2Hme3PMGF390DaL52beVrIihQ== @@ -2840,6 +2840,11 @@ resolved "https://registry.yarnpkg.com/@thirdweb-dev/chains/-/chains-0.1.58.tgz#5a26fe187ef39b7c6af87972166785d110cad53e" integrity sha512-prSShAWoLODuZQcDBwNDqcXLzfevV2BOw54cDaHetSP+Sw/BP6SaPKIxojRQGsXARjn0JMWniG/NCtppUUHALQ== +"@thirdweb-dev/chains@0.1.59": + version "0.1.59" + resolved "https://registry.yarnpkg.com/@thirdweb-dev/chains/-/chains-0.1.59.tgz#6dc700ac6c8acf584ba6ac268b12606bd09236d7" + integrity sha512-fd1ZjHGwz/P1eRPffxfhITa/wv+DJuIG/5SIRZS22mOKTQNHr8+HtrmO/sous/D8ZMgIHmRCmOMeUfiYinxdKA== + "@thirdweb-dev/contracts-js@1.3.16": version "1.3.16" resolved "https://registry.yarnpkg.com/@thirdweb-dev/contracts-js/-/contracts-js-1.3.16.tgz#264727b40b0f320c01eefcb4295e89c9e9947002" @@ -2858,6 +2863,14 @@ "@thirdweb-dev/dynamic-contracts" "^1.1.2" erc721a-upgradeable "^3.3.0" +"@thirdweb-dev/crypto@0.2.0": + version "0.2.0" + resolved "https://registry.yarnpkg.com/@thirdweb-dev/crypto/-/crypto-0.2.0.tgz#ad629854107b58647101fc6f3d2bbe619ae34424" + integrity sha512-hQwSCL/imqSCcnUXlGqJi6dfs4UOcJ91Eq/t1cPXyAb6nwvyaePZPVFqGDglZMQvkS/NWZhifXZINRiCfazn2w== + dependencies: + "@noble/hashes" "^1.3.2" + js-sha3 "^0.9.2" + "@thirdweb-dev/dynamic-contracts@^1.1.2": version "1.1.4" resolved "https://registry.yarnpkg.com/@thirdweb-dev/dynamic-contracts/-/dynamic-contracts-1.1.4.tgz#f14de117bad647d0f32073325627f4b31963446f" @@ -2868,7 +2881,17 @@ resolved "https://registry.yarnpkg.com/@thirdweb-dev/generated-abis/-/generated-abis-0.0.1.tgz#0d788d6aff0ac08f11e9eeb9ae4c8321845272a8" integrity sha512-vO9/3lSLO8smyyH1QVeYravSTzFwV1nf1C/Im1NBDPdH8//YvcbhtETGGiNfHWpyCvSi0vRYwvf+/7FKdwpDGQ== -"@thirdweb-dev/sdk@4.0.13", "@thirdweb-dev/sdk@^4.0.13": +"@thirdweb-dev/merkletree@0.2.0": + version "0.2.0" + resolved "https://registry.yarnpkg.com/@thirdweb-dev/merkletree/-/merkletree-0.2.0.tgz#b1f7275bd54b499bb0c98863692e77b146308eb4" + integrity sha512-4KoH2EOCWKiaHfhDO5Tnf1HjeCXKVfLt31y0kcSG5C0gCldnhm7i1fGUB8e0hW3trfyPQAuSgyP67Ep3UwzClg== + dependencies: + "@thirdweb-dev/crypto" "0.2.0" + buffer "^6.0.3" + buffer-reverse "^1.0.1" + treeify "^1.1.0" + +"@thirdweb-dev/sdk@4.0.13": version "4.0.13" resolved "https://registry.yarnpkg.com/@thirdweb-dev/sdk/-/sdk-4.0.13.tgz#8e960271d019bfdcce553c8665b5dd2874d27f83" integrity sha512-466UxsdqS6j+UW7gim64g8dOBZ4XVp/p7uV7bKdFNgYC2fM8OQk3aidg0BfL3J00VwzKabjJmdA8gR506NTACQ== @@ -2912,6 +2935,29 @@ yaml "^2.3.1" zod "^3.22.3" +"@thirdweb-dev/sdk@^4.0.17": + version "4.0.17" + resolved "https://registry.yarnpkg.com/@thirdweb-dev/sdk/-/sdk-4.0.17.tgz#213aa8d81a50d3b827d5691b02fe8cdedf8cc3d9" + integrity sha512-1OZiOSRmNsrMzkzGcoDoUnUlUUFCJwjDqFTQACgoD/LL0zj4X+W6IZ9qngMsJ98tU13ZHpYWwSkI/flzg5Jzkg== + dependencies: + "@thirdweb-dev/chains" "0.1.59" + "@thirdweb-dev/contracts-js" "1.3.16" + "@thirdweb-dev/crypto" "0.2.0" + "@thirdweb-dev/generated-abis" "0.0.1" + "@thirdweb-dev/merkletree" "0.2.0" + "@thirdweb-dev/storage" "2.0.5" + abitype "^0.2.5" + bn.js "^5.2.1" + bs58 "^5.0.0" + buffer "^6.0.3" + eventemitter3 "^5.0.1" + fast-deep-equal "^3.1.3" + tiny-invariant "^1.2.0" + tweetnacl "^1.0.3" + uuid "^9.0.1" + yaml "^2.3.1" + zod "^3.22.3" + "@thirdweb-dev/service-utils@^0.4.2": version "0.4.2" resolved "https://registry.yarnpkg.com/@thirdweb-dev/service-utils/-/service-utils-0.4.2.tgz#bb22fcb38349bf4c5879cbfc105902d3cc32413a" @@ -2938,6 +2984,16 @@ form-data "^4.0.0" uuid "^9.0.1" +"@thirdweb-dev/storage@2.0.5": + version "2.0.5" + resolved "https://registry.yarnpkg.com/@thirdweb-dev/storage/-/storage-2.0.5.tgz#e492923ca0037db7cd8f30572333a3bb2f67639e" + integrity sha512-I3DK/ZNWOMa/XE2hfJnGKVfc9INn5c3if1qavyK/1fjJBxhUiUXjT59UYbuoWhHLEq0rS/QZVOGS/9qcOs/DAQ== + dependencies: + "@thirdweb-dev/crypto" "0.2.0" + cid-tool "^3.0.0" + form-data "^4.0.0" + uuid "^9.0.1" + "@thirdweb-dev/wallets@2.0.8-nightly-c238fde8-20231020022304": version "2.0.8-nightly-c238fde8-20231020022304" resolved "https://registry.yarnpkg.com/@thirdweb-dev/wallets/-/wallets-2.0.8-nightly-c238fde8-20231020022304.tgz#1f4f93060e2a7155275117e78420f1107cb16572" @@ -6715,6 +6771,11 @@ js-sha3@0.8.0, js-sha3@^0.8.0: resolved "https://registry.yarnpkg.com/js-sha3/-/js-sha3-0.8.0.tgz#b9b7a5da73afad7dedd0f8c463954cbde6818840" integrity sha512-gF1cRrHhIzNfToc802P800N8PpXS+evLLXfsVpowqmAFR9uwbi89WvXg2QspOmXL8QL86J4T1EpFu+yUkwJY3Q== +js-sha3@^0.9.2: + version "0.9.2" + resolved "https://registry.yarnpkg.com/js-sha3/-/js-sha3-0.9.2.tgz#a5ba3967ddf5a095f7b3389ef14a6297b10d6409" + integrity sha512-8kgvwd03wNGQG1GRvl3yy1Yt40sICAcIMsDU2ZLgoL0Z6z9rkRmf9Vd+bi/gYSzgAqMUGl/jiDKu0J8AWFd+BQ== + js-yaml@4.1.0, js-yaml@^4.1.0: version "4.1.0" resolved "https://registry.yarnpkg.com/js-yaml/-/js-yaml-4.1.0.tgz#c1fb65f8f5017901cdd2c951864ba18458a10602"