From b70454ef9621729b49d297003e53f7a1e6f7192c Mon Sep 17 00:00:00 2001 From: Adam Majmudar <64697628+adam-maj@users.noreply.github.com> Date: Tue, 5 Dec 2023 14:11:50 -0800 Subject: [PATCH] Add layered error handling to worker (#333) * Add layered error handling to worker * Update * Update --- src/db/transactions/queueTx.ts | 15 +- src/worker/tasks/processTx.ts | 500 +++++++++++++++++---------------- 2 files changed, 276 insertions(+), 239 deletions(-) diff --git a/src/db/transactions/queueTx.ts b/src/db/transactions/queueTx.ts index e3e51b36d..fc4a7d051 100644 --- a/src/db/transactions/queueTx.ts +++ b/src/db/transactions/queueTx.ts @@ -8,6 +8,7 @@ import { BigNumber } from "ethers"; import type { ContractExtension } from "../../schema/extension"; import { PrismaTransaction } from "../../schema/prisma"; import { getPrismaWithPostgresTx } from "../client"; +import { getWalletDetails } from "../wallets/getWalletDetails"; interface QueueTxParams { pgtx?: PrismaTransaction; @@ -40,6 +41,16 @@ export const queueTx = async ({ const prisma = getPrismaWithPostgresTx(pgtx); + const fromAddress = (await tx.getSignerAddress()).toLowerCase(); + const walletDetails = await getWalletDetails({ + pgtx, + address: fromAddress, + }); + + if (!walletDetails) { + throw new Error(`No configured wallet found with address ${fromAddress}`); + } + // TODO: We need a much safer way of detecting if the transaction should be a user operation const isUserOp = !!(tx.getSigner as ERC4337EthersSigner).erc4337provider; const txTableData = { @@ -61,7 +72,7 @@ export const queueTx = async ({ signerAddress: await ( tx.getSigner as ERC4337EthersSigner ).originalSigner.getAddress(), - accountAddress: (await tx.getSignerAddress()).toLowerCase(), + accountAddress: fromAddress, // Fields needed to send user operation target: tx.getTarget().toLowerCase(), }, @@ -73,7 +84,7 @@ export const queueTx = async ({ data: { ...txTableData, // Fields needed to send transaction - fromAddress: (await tx.getSignerAddress()).toLowerCase(), + fromAddress, toAddress: tx.getTarget().toLowerCase(), }, }); diff --git a/src/worker/tasks/processTx.ts b/src/worker/tasks/processTx.ts index ffd09e3ef..2eeb8f870 100644 --- a/src/worker/tasks/processTx.ts +++ b/src/worker/tasks/processTx.ts @@ -104,274 +104,300 @@ export const processTx = async () => { parseInt(key.split("-")[1]), ]; - const sdk = await getSdk({ - pgtx, - chainId, - walletAddress, - }); - - const [signer, provider] = await Promise.all([ - sdk.getSigner(), - sdk.getProvider() as StaticJsonRpcBatchProvider, - ]); - - if (!signer || !provider) { - return; - } - - // - For each wallet address, check the nonce in database and the mempool - const [walletBalance, mempoolNonceData, dbNonceData, gasOverrides] = - await Promise.all([ - sdk.wallet.balance(), - sdk.wallet.getNonce("pending"), - getWalletNonce({ - pgtx, - chainId, - address: walletAddress, - }), - getDefaultGasOverrides(provider), - ]); - - // Wallet Balance Webhook - if ( - BigNumber.from(walletBalance.value).lte( - BigNumber.from(config.minWalletBalance), - ) - ) { - const message = - "Wallet balance is below minimum threshold. Please top up your wallet."; - const walletBalanceData: WalletBalanceWebhookSchema = { - walletAddress, - minimumBalance: ethers.utils.formatEther(config.minWalletBalance), - currentBalance: walletBalance.displayValue, - chainId, - message, - }; - - await sendBalanceWebhook(walletBalanceData); - - logger.worker.warn( - `[Low Wallet Balance] [${walletAddress}]: ` + message, - ); - } - - if (!dbNonceData) { - logger.worker.error( - `Could not find nonce or details for wallet ${walletAddress} on chain ${chainId}`, - ); - } - - // - Take the larger of the nonces, and update database nonce to mepool value if mempool is greater - let startNonce: BigNumber; - const mempoolNonce = BigNumber.from(mempoolNonceData); - const dbNonce = BigNumber.from(dbNonceData?.nonce || 0); - if (mempoolNonce.gt(dbNonce)) { - await updateWalletNonce({ + try { + const sdk = await getSdk({ pgtx, chainId, - address: walletAddress, - nonce: mempoolNonce.toNumber(), + walletAddress, }); - startNonce = mempoolNonce; - } else { - startNonce = dbNonce; - } + const [signer, provider] = await Promise.all([ + sdk.getSigner(), + sdk.getProvider() as StaticJsonRpcBatchProvider, + ]); - // Group all transactions into a single batch rpc request - let sentTxCount = 0; - const rpcResponses: { - queueId: string; - tx: ethers.providers.TransactionRequest; - res: RpcResponse; - }[] = []; - for (const tx of txsToSend) { - const nonce = startNonce.add(sentTxCount); - - try { - const txRequest = await signer.populateTransaction({ - to: tx.toAddress!, - from: tx.fromAddress!, - data: tx.data!, - value: tx.value!, - nonce, - ...gasOverrides, - }); + if (!signer || !provider) { + return; + } - // TODO: We need to target specific cases here - // Bump gas limit to avoid occasional out of gas errors - txRequest.gasLimit = txRequest.gasLimit - ? BigNumber.from(txRequest.gasLimit).mul(120).div(100) - : undefined; - - logger.worker.info( - `[Transaction] [${tx.queueId}] Using maxFeePerGas ${ - txRequest.maxFeePerGas !== undefined - ? formatUnits(txRequest.maxFeePerGas, "gwei") - : undefined - }, maxPriorityFeePerGas ${ - txRequest.maxPriorityFeePerGas !== undefined - ? formatUnits(txRequest.maxPriorityFeePerGas, "gwei") - : undefined - }, gasPrice ${ - txRequest.gasPrice !== undefined - ? formatUnits(txRequest.gasPrice, "gwei") - : undefined - }`, - ); + // - For each wallet address, check the nonce in database and the mempool + const [walletBalance, mempoolNonceData, dbNonceData, gasOverrides] = + await Promise.all([ + sdk.wallet.balance(), + sdk.wallet.getNonce("pending"), + getWalletNonce({ + pgtx, + chainId, + address: walletAddress, + }), + getDefaultGasOverrides(provider), + ]); + + // Wallet Balance Webhook + if ( + BigNumber.from(walletBalance.value).lte( + BigNumber.from(config.minWalletBalance), + ) + ) { + const message = + "Wallet balance is below minimum threshold. Please top up your wallet."; + const walletBalanceData: WalletBalanceWebhookSchema = { + walletAddress, + minimumBalance: ethers.utils.formatEther( + config.minWalletBalance, + ), + currentBalance: walletBalance.displayValue, + chainId, + message, + }; - const signature = await signer.signTransaction(txRequest); + await sendBalanceWebhook(walletBalanceData); - logger.worker.info( - `[Transaction] [${tx.queueId}] Sending transaction to ${provider.connection.url}`, + logger.worker.warn( + `[Low Wallet Balance] [${walletAddress}]: ` + message, ); - const res = await fetch(provider.connection.url, { - method: "POST", - headers: { - "Content-Type": "application/json", - ...(provider.connection.url.includes("rpc.thirdweb.com") - ? { - "x-secret-key": env.THIRDWEB_API_SECRET_KEY, - } - : {}), - }, - body: JSON.stringify({ - id: 0, - jsonrpc: "2.0", - method: "eth_sendRawTransaction", - params: [signature], - }), - }); - const rpcResponse = (await res.json()) as RpcResponse; - rpcResponses.push({ - queueId: tx.queueId!, - tx: txRequest, - res: rpcResponse, - }); + } - if (!rpcResponse.error && !!rpcResponse.result) { - sentTxCount++; - } - } catch (err: any) { - logger.worker.warn( - `[Transaction] [${ - tx.queueId - }] Failed to build transaction with error - ${ - err?.message || err - }`, + if (!dbNonceData) { + logger.worker.error( + `Could not find nonce or details for wallet ${walletAddress} on chain ${chainId}`, ); + } - await updateTx({ + // - Take the larger of the nonces, and update database nonce to mepool value if mempool is greater + let startNonce: BigNumber; + const mempoolNonce = BigNumber.from(mempoolNonceData); + const dbNonce = BigNumber.from(dbNonceData?.nonce || 0); + if (mempoolNonce.gt(dbNonce)) { + await updateWalletNonce({ pgtx, - queueId: tx.queueId!, - data: { - status: TransactionStatusEnum.Errored, - errorMessage: - err?.message || - err?.toString() || - `Failed to handle transaction`, - }, + chainId, + address: walletAddress, + nonce: mempoolNonce.toNumber(), }); - sendWebhookForQueueIds.push(tx.queueId!); + + startNonce = mempoolNonce; + } else { + startNonce = dbNonce; } - } - // Check how many transactions succeeded and increment nonce - const incrementNonce = rpcResponses.reduce((acc, curr) => { - return curr.res.result && !curr.res.error ? acc + 1 : acc; - }, 0); + // Group all transactions into a single batch rpc request + let sentTxCount = 0; + const rpcResponses: { + queueId: string; + tx: ethers.providers.TransactionRequest; + res: RpcResponse; + }[] = []; + for (const tx of txsToSend) { + const nonce = startNonce.add(sentTxCount); + + try { + const txRequest = await signer.populateTransaction({ + to: tx.toAddress!, + from: tx.fromAddress!, + data: tx.data!, + value: tx.value!, + nonce, + ...gasOverrides, + }); + + // TODO: We need to target specific cases + // Bump gas limit to avoid occasional out of gas errors + txRequest.gasLimit = txRequest.gasLimit + ? BigNumber.from(txRequest.gasLimit).mul(120).div(100) + : undefined; - await updateWalletNonce({ - pgtx, - address: walletAddress, - chainId, - nonce: startNonce.toNumber() + incrementNonce, - }); + logger.worker.info( + `[Transaction] [${tx.queueId}] Using maxFeePerGas ${ + txRequest.maxFeePerGas !== undefined + ? formatUnits(txRequest.maxFeePerGas, "gwei") + : undefined + }, maxPriorityFeePerGas ${ + txRequest.maxPriorityFeePerGas !== undefined + ? formatUnits(txRequest.maxPriorityFeePerGas, "gwei") + : undefined + }, gasPrice ${ + txRequest.gasPrice !== undefined + ? formatUnits(txRequest.gasPrice, "gwei") + : undefined + }`, + ); - // Update transaction records with updated data - const txStatuses: SentTxStatus[] = await Promise.all( - rpcResponses.map(async ({ queueId, tx, res }) => { - if (res.result) { - const txHash = res.result; - const txRes = (await provider.getTransaction( - txHash, - )) as ethers.providers.TransactionResponse | null; + const signature = await signer.signTransaction(txRequest); logger.worker.info( - `[Transaction] [${queueId}] Sent transaction with hash '${txHash}' and nonce '${tx.nonce}'`, + `[Transaction] [${tx.queueId}] Sending transaction to ${provider.connection.url}`, ); - - return { - transactionHash: txHash, - status: TransactionStatusEnum.Submitted, - queueId: queueId, - res: txRes, - sentAtBlockNumber: await provider.getBlockNumber(), - }; - } else { + const res = await fetch(provider.connection.url, { + method: "POST", + headers: { + "Content-Type": "application/json", + ...(provider.connection.url.includes("rpc.thirdweb.com") + ? { + "x-secret-key": env.THIRDWEB_API_SECRET_KEY, + } + : {}), + }, + body: JSON.stringify({ + id: 0, + jsonrpc: "2.0", + method: "eth_sendRawTransaction", + params: [signature], + }), + }); + const rpcResponse = (await res.json()) as RpcResponse; + rpcResponses.push({ + queueId: tx.queueId!, + tx: txRequest, + res: rpcResponse, + }); + + if (!rpcResponse.error && !!rpcResponse.result) { + sentTxCount++; + } + } catch (err: any) { logger.worker.warn( - `[Transaction] [${queueId}] Failed to send with error - ${JSON.stringify( - res.error, - )}`, + `[Transaction] [${ + tx.queueId + }] Failed to build transaction with error - ${ + err?.message || err + }`, ); - return { - status: TransactionStatusEnum.Errored, - queueId: queueId, - errorMessage: - res.error?.message || - res.error?.toString() || - `Failed to handle transaction`, - }; + await updateTx({ + pgtx, + queueId: tx.queueId!, + data: { + status: TransactionStatusEnum.Errored, + errorMessage: + err?.message || + err?.toString() || + `Failed to handle transaction`, + }, + }); + sendWebhookForQueueIds.push(tx.queueId!); } - }), - ); + } - // - After sending transactions, update database for each transaction - await Promise.all( - txStatuses.map(async (tx) => { - switch (tx.status) { - case TransactionStatusEnum.Submitted: - await updateTx({ - pgtx, - queueId: tx.queueId, - data: { - status: TransactionStatusEnum.Submitted, - transactionHash: tx.transactionHash, - res: tx.res, - sentAtBlockNumber: await provider.getBlockNumber(), - }, - }); - break; - case TransactionStatusEnum.Errored: - await updateTx({ - pgtx, - queueId: tx.queueId, - data: { - status: TransactionStatusEnum.Errored, - errorMessage: tx.errorMessage, - }, - }); - break; - } - sendWebhookForQueueIds.push(tx.queueId!); - }), - ); + // Check how many transactions succeeded and increment nonce + const incrementNonce = rpcResponses.reduce((acc, curr) => { + return curr.res.result && !curr.res.error ? acc + 1 : acc; + }, 0); + + await updateWalletNonce({ + pgtx, + address: walletAddress, + chainId, + nonce: startNonce.toNumber() + incrementNonce, + }); + + // Update transaction records with updated data + const txStatuses: SentTxStatus[] = await Promise.all( + rpcResponses.map(async ({ queueId, tx, res }) => { + if (res.result) { + const txHash = res.result; + const txRes = (await provider.getTransaction( + txHash, + )) as ethers.providers.TransactionResponse | null; + + logger.worker.info( + `[Transaction] [${queueId}] Sent transaction with hash '${txHash}' and nonce '${tx.nonce}'`, + ); + + return { + transactionHash: txHash, + status: TransactionStatusEnum.Submitted, + queueId: queueId, + res: txRes, + sentAtBlockNumber: await provider.getBlockNumber(), + }; + } else { + logger.worker.warn( + `[Transaction] [${queueId}] Failed to send with error - ${JSON.stringify( + res.error, + )}`, + ); + + return { + status: TransactionStatusEnum.Errored, + queueId: queueId, + errorMessage: + res.error?.message || + res.error?.toString() || + `Failed to handle transaction`, + }; + } + }), + ); + + // - After sending transactions, update database for each transaction + await Promise.all( + txStatuses.map(async (tx) => { + switch (tx.status) { + case TransactionStatusEnum.Submitted: + await updateTx({ + pgtx, + queueId: tx.queueId, + data: { + status: TransactionStatusEnum.Submitted, + transactionHash: tx.transactionHash, + res: tx.res, + sentAtBlockNumber: await provider.getBlockNumber(), + }, + }); + break; + case TransactionStatusEnum.Errored: + await updateTx({ + pgtx, + queueId: tx.queueId, + data: { + status: TransactionStatusEnum.Errored, + errorMessage: tx.errorMessage, + }, + }); + break; + } + sendWebhookForQueueIds.push(tx.queueId!); + }), + ); + } catch (err: any) { + await Promise.all( + txsToSend.map(async (tx) => { + logger.worker.error( + `[Transaction] [${ + tx.queueId + }] Failed to process batch of transactions for wallet '${walletAddress}' on chain '${chainId}' - ${ + err || err?.message + }`, + ); + await updateTx({ + pgtx, + queueId: tx.queueId!, + data: { + status: TransactionStatusEnum.Errored, + errorMessage: `[Worker] [Error] Failed to process batch of transactions for wallet - ${ + err || err?.message + }`, + }, + }); + }), + ); + } }); // 5. Send all user operations in parallel with multi-dimensional nonce const sentUserOps = userOpsToSend.map(async (tx) => { - const signer = ( - await getSdk({ - pgtx, - chainId: parseInt(tx.chainId!), - walletAddress: tx.signerAddress!, - accountAddress: tx.accountAddress!, - }) - ).getSigner() as ERC4337EthersSigner; - - const nonce = randomNonce(); try { + const signer = ( + await getSdk({ + pgtx, + chainId: parseInt(tx.chainId!), + walletAddress: tx.signerAddress!, + accountAddress: tx.accountAddress!, + }) + ).getSigner() as ERC4337EthersSigner; + + const nonce = randomNonce(); const userOp = await signer.smartAccountAPI.createSignedUserOp({ target: tx.target || "", data: tx.data || "0x",