Skip to content

Commit

Permalink
Webhook Updates + On-Chain Tx Status Add (#314)
Browse files Browse the repository at this point in the history
* added new column onChainTxStatus based on receipt status. Solved webhook multisend using locks

* updates example

* updated UserOp Tx Receipt update to use prisma Transactions too

* updated userOp tx rceipt check. Updated getTxById to not use locking when not needed

* updated load test to ignore TLS/SSL when using https for testing

* updated SDK to latest to get MetaTransaction Updates

* updated return type for raw query in Transactions tbl

* webhooks updates for multi-serer setup. UserOp Status data added

* moved sendWebhookForQueueIds post update statement
  • Loading branch information
farhanW3 authored Nov 24, 2023
1 parent 793cbc9 commit c33ed20
Show file tree
Hide file tree
Showing 19 changed files with 384 additions and 251 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
"@t3-oss/env-core": "^0.6.0",
"@thirdweb-dev/auth": "^4.1.0-nightly-c238fde8-20231020022304",
"@thirdweb-dev/chains": "^0.1.55",
"@thirdweb-dev/sdk": "^4.0.13",
"@thirdweb-dev/sdk": "^4.0.17",
"@thirdweb-dev/service-utils": "^0.4.2",
"@thirdweb-dev/wallets": "^2.1.5",
"body-parser": "^1.20.2",
Expand Down
36 changes: 18 additions & 18 deletions src/db/transactions/getQueuedTxs.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Transactions } from "@prisma/client";
import { Static } from "@sinclair/typebox";
import { PrismaTransaction } from "../../schema/prisma";
import { transactionResponseSchema } from "../../server/schemas/transaction";
Expand All @@ -16,24 +17,23 @@ export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise<
const config = await getConfiguration();

// TODO: Don't use env var for transactions to batch
const txs = await prisma.$queryRaw`
SELECT
*
FROM
"transactions"
WHERE
"processedAt" IS NULL
AND "sentAt" IS NULL
AND "minedAt" IS NULL
AND "cancelledAt" IS NULL
ORDER BY
"queuedAt"
ASC
LIMIT
${config.maxTxsToProcess}
FOR UPDATE SKIP LOCKED
const txs = await prisma.$queryRaw<Transactions[]>`
SELECT
*
FROM
"transactions"
WHERE
"processedAt" IS NULL
AND "sentAt" IS NULL
AND "minedAt" IS NULL
AND "cancelledAt" IS NULL
ORDER BY
"queuedAt"
ASC
LIMIT
${config.maxTxsToProcess}
FOR UPDATE SKIP LOCKED
`;

// TODO: This might not work!
return cleanTxs(txs as any);
return cleanTxs(txs);
};
39 changes: 13 additions & 26 deletions src/db/transactions/getSentTxs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,30 +13,17 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise<
const prisma = getPrismaWithPostgresTx(pgtx);
const config = await getConfiguration();

return prisma.transactions.findMany({
where: {
processedAt: {
not: null,
},
sentAt: {
not: null,
},
transactionHash: {
not: null,
},
accountAddress: null,
minedAt: null,
errorMessage: null,
retryCount: {
// TODO: What should the max retries be here?
lt: 3,
},
},
orderBy: [
{
sentAt: "asc",
},
],
take: config.maxTxsToUpdate,
});
return prisma.$queryRaw<Transactions[]>`
SELECT * FROM "transactions"
WHERE "processedAt" IS NOT NULL
AND "sentAt" IS NOT NULL
AND "transactionHash" IS NOT NULL
AND "accountAddress" IS NULL
AND "minedAt" IS NULL
AND "errorMessage" IS NULL
AND "retryCount" < ${config.maxTxsToUpdate}
ORDER BY "sentAt" ASC
LIMIT ${config.maxTxsToUpdate}
FOR UPDATE SKIP LOCKED
`;
};
41 changes: 13 additions & 28 deletions src/db/transactions/getSentUserOps.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,17 @@ export const getSentUserOps = async ({
const prisma = getPrismaWithPostgresTx(pgtx);
const config = await getConfiguration();

return prisma.transactions.findMany({
where: {
processedAt: {
not: null,
},
sentAt: {
not: null,
},
accountAddress: {
not: null,
},
userOpHash: {
not: null,
},
minedAt: null,
errorMessage: null,
retryCount: {
// TODO: What should the max retries be here?
lt: 3,
},
},
orderBy: [
{
sentAt: "asc",
},
],
take: config.maxTxsToUpdate,
});
return prisma.$queryRaw<Transactions[]>`
SELECT * FROM "transactions"
WHERE "processedAt" IS NOT NULL
AND "sentAt" IS NOT NULL
AND "accountAddress" IS NOT NULL
AND "userOpHash" IS NOT NULL
AND "minedAt" IS NULL
AND "errorMessage" IS NULL
AND "retryCount" < 3
ORDER BY "sentAt" ASC
LIMIT ${config.maxTxsToUpdate}
FOR UPDATE SKIP LOCKED;
`;
};
2 changes: 1 addition & 1 deletion src/db/transactions/getTxById.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@ export const getTxById = async ({
typeof transactionResponseSchema
> | null> => {
const prisma = getPrismaWithPostgresTx(pgtx);

const tx = await prisma.transactions.findUnique({
where: {
id: queueId,
},
});

if (!tx) {
return null;
}
Expand Down
30 changes: 30 additions & 0 deletions src/db/transactions/getTxByIds.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { Static } from "@sinclair/typebox";
import { PrismaTransaction } from "../../schema/prisma";
import { transactionResponseSchema } from "../../server/schemas/transaction";
import { prisma } from "../client";
import { cleanTxs } from "./cleanTxs";
interface GetTxByIdsParams {
queueIds: string[];
pgtx?: PrismaTransaction;
}

export const getTxByIds = async ({
queueIds,
}: GetTxByIdsParams): Promise<
Static<typeof transactionResponseSchema>[] | null
> => {
const tx = await prisma.transactions.findMany({
where: {
id: {
in: queueIds,
},
},
});

if (!tx || tx.length === 0) {
return null;
}

const cleanedTx = cleanTxs(tx);
return cleanedTx;
};
12 changes: 12 additions & 0 deletions src/db/transactions/updateTx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,12 @@ type UpdateTxData =
gasPrice?: string;
blockNumber?: number;
minedAt: Date;
onChainTxStatus?: number;
transactionHash?: string;
transactionType?: number;
gasLimit?: string;
maxFeePerGas?: string;
maxPriorityFeePerGas?: string;
};

export const updateTx = async ({ pgtx, queueId, data }: UpdateTxParams) => {
Expand Down Expand Up @@ -109,6 +115,12 @@ export const updateTx = async ({ pgtx, queueId, data }: UpdateTxParams) => {
minedAt: data.minedAt,
gasPrice: data.gasPrice,
blockNumber: data.blockNumber,
onChainTxStatus: data.onChainTxStatus,
transactionHash: data.transactionHash,
transactionType: data.transactionType,
gasLimit: data.gasLimit,
maxFeePerGas: data.maxFeePerGas,
maxPriorityFeePerGas: data.maxPriorityFeePerGas,
},
});
break;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
-- AlterTable
ALTER TABLE "transactions" ADD COLUMN "onChainTxStatus" INTEGER;
1 change: 1 addition & 0 deletions src/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ model Transactions {
gasPrice String? @map("gasPrice")
transactionType Int? @map("transactionType")
transactionHash String? @map("transactionHash")
onChainTxStatus Int? @map("onChainTxStatus")
// User Operation
signerAddress String? @map("signerAddress")
accountAddress String? @map("accountAddress")
Expand Down
1 change: 1 addition & 0 deletions src/server/routes/transaction/status.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ responseBodySchema.example = {
errorMessage: "",
txMinedTimestamp: "2023-08-25T22:42:33.000Z",
blockNumber: 39398545,
onChainTxStatus: 1,
},
};

Expand Down
1 change: 1 addition & 0 deletions src/server/schemas/transaction/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,7 @@ export const transactionResponseSchema = Type.Object({
userOpHash: Type.Union([Type.String(), Type.Null()]),
functionName: Type.Union([Type.String(), Type.Null()]),
functionArgs: Type.Union([Type.String(), Type.Null()]),
onChainTxStatus: Type.Union([Type.Number(), Type.Null()]),
});

export enum TransactionStatusEnum {
Expand Down
118 changes: 58 additions & 60 deletions src/server/utils/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import crypto from "crypto";
import { getConfiguration } from "../../db/configuration/getConfiguration";
import { getTxById } from "../../db/transactions/getTxById";
import { getTxByIds } from "../../db/transactions/getTxByIds";
import {
SanitizedWebHooksSchema,
WalletBalanceWebhookSchema,
Expand All @@ -12,10 +11,6 @@ import { TransactionStatusEnum } from "../schemas/transaction";

let balanceNotificationLastSentAt = -1;

interface TxWebookParams {
id: string;
}

export const generateSignature = (
body: Record<string, any>,
timestamp: string,
Expand Down Expand Up @@ -74,65 +69,68 @@ export const sendWebhookRequest = async (
return true;
};

export const sendTxWebhook = async (data: TxWebookParams): Promise<void> => {
export const sendTxWebhook = async (queueIds: string[]): Promise<void> => {
try {
const txData = await getTxById({ queueId: data.id });
if (!txData) {
throw new Error(`Transaction ${data.id} not found.`);
}

let webhookConfig: SanitizedWebHooksSchema[] | undefined =
await getWebhookConfig(WebhooksEventTypes.ALL_TX);

// For Backwards Compatibility
const config = await getConfiguration();
if (config?.webhookUrl && config?.webhookAuthBearerToken) {
const newFormatWebhookData = {
id: 0,
url: config.webhookUrl,
secret: config.webhookAuthBearerToken,
active: true,
eventType: WebhooksEventTypes.ALL_TX,
createdAt: new Date().toISOString(),
name: "Legacy Webhook",
};
await sendWebhookRequest(newFormatWebhookData, txData);
const txDataByIds = await getTxByIds({ queueIds });
if (!txDataByIds || txDataByIds.length === 0) {
return;
}

if (!webhookConfig) {
switch (txData.status) {
case TransactionStatusEnum.Queued:
webhookConfig = await getWebhookConfig(WebhooksEventTypes.QUEUED_TX);
break;
case TransactionStatusEnum.Submitted:
webhookConfig = await getWebhookConfig(WebhooksEventTypes.SENT_TX);
break;
case TransactionStatusEnum.Retried:
webhookConfig = await getWebhookConfig(WebhooksEventTypes.RETRIED_TX);
break;
case TransactionStatusEnum.Mined:
webhookConfig = await getWebhookConfig(WebhooksEventTypes.MINED_TX);
break;
case TransactionStatusEnum.Errored:
webhookConfig = await getWebhookConfig(WebhooksEventTypes.ERRORED_TX);
break;
case TransactionStatusEnum.Cancelled:
webhookConfig = await getWebhookConfig(WebhooksEventTypes.ERRORED_TX);
break;
}
}

webhookConfig?.map(async (config) => {
if (!config || !config?.active) {
logger.server.debug("No Webhook Set or Active, skipping webhook send");
for (const txData of txDataByIds!) {
if (!txData) {
return;
} else {
let webhookConfig: SanitizedWebHooksSchema[] | undefined =
await getWebhookConfig(WebhooksEventTypes.ALL_TX);

if (!webhookConfig) {
switch (txData.status) {
case TransactionStatusEnum.Queued:
webhookConfig = await getWebhookConfig(
WebhooksEventTypes.QUEUED_TX,
);
break;
case TransactionStatusEnum.Submitted:
webhookConfig = await getWebhookConfig(
WebhooksEventTypes.SENT_TX,
);
break;
case TransactionStatusEnum.Retried:
webhookConfig = await getWebhookConfig(
WebhooksEventTypes.RETRIED_TX,
);
break;
case TransactionStatusEnum.Mined:
webhookConfig = await getWebhookConfig(
WebhooksEventTypes.MINED_TX,
);
break;
case TransactionStatusEnum.Errored:
webhookConfig = await getWebhookConfig(
WebhooksEventTypes.ERRORED_TX,
);
break;
case TransactionStatusEnum.Cancelled:
webhookConfig = await getWebhookConfig(
WebhooksEventTypes.ERRORED_TX,
);
break;
}
}

webhookConfig?.map(async (config) => {
if (!config || !config?.active) {
logger.server.debug(
"No Webhook Set or Active, skipping webhook send",
);
return;
}

await sendWebhookRequest(config, txData);
});
}

await sendWebhookRequest(config, txData);
});
}
} catch (error) {
logger.server.error(`[sendWebhook] error: ${error}`);
logger.server.error(error);
}
};

Expand Down Expand Up @@ -171,6 +169,6 @@ export const sendBalanceWebhook = async (
}
});
} catch (error) {
logger.server.error(`[sendWebhook] error: ${error}`);
logger.server.error(error);
}
};
Loading

0 comments on commit c33ed20

Please sign in to comment.