From b6f8dbf4711609eb0848bc1b2e6d3099c109b0b0 Mon Sep 17 00:00:00 2001 From: Adam Majmudar <64697628+adam-maj@users.noreply.github.com> Date: Mon, 16 Oct 2023 15:20:06 -0700 Subject: [PATCH] Migrate worker configuration to database (#224) --- .env.example | 17 +-------- docs/1-user-guide.md | 20 +++++------ src/db/configuration/getConfiguration.ts | 26 ++++++++++++++ src/db/transactions/getQueuedTxs.ts | 5 +-- src/db/transactions/getSentTxs.ts | 5 +-- src/db/transactions/getSentUserOps.ts | 5 +-- src/db/transactions/getTxToRetry.ts | 5 +-- .../migration.sql | 15 ++++++++ src/prisma/schema.prisma | 18 ++++++++++ src/utils/env.ts | 35 ------------------- src/worker/index.ts | 5 ++- src/worker/listeners/minedTxListener.ts | 14 ++++---- src/worker/listeners/retryTxListener.ts | 14 ++++---- src/worker/tasks/processTx.ts | 5 +-- src/worker/tasks/retryTx.ts | 10 +++--- 15 files changed, 107 insertions(+), 92 deletions(-) create mode 100644 src/db/configuration/getConfiguration.ts create mode 100644 src/prisma/migrations/20231016220744_configuration/migration.sql diff --git a/.env.example b/.env.example index 47f1f44ca..31158b9b5 100644 --- a/.env.example +++ b/.env.example @@ -38,10 +38,6 @@ OPENAPI_BASE_ORIGIN=http://localhost:3005 # CONFIGURATION OPTIONS [Optional] # ---------------------- -# the number of transactions to queue for before starting processing -MIN_TRANSACTION_TO_PROCESS=1 -# the max number of transactions to batch together for processing -TRANSACTIONS_TO_BATCH=10 # Override default RPCs for any chain. # Can be a URL or file path to a json config file # See example config file: chain-overrides.example.json @@ -69,15 +65,4 @@ BENCHMARK_POST_BODY='{ "args": ["0x1946267d81Fb8aDeeEa28e6B98bcD446c8248473", 100000] }' BENCHMARK_CONCURRENCY=10 -BENCHMARK_REQUESTS=10 - -# Retey Gas Max Values (All in wei) -# Default Values -# MAX_FEE_PER_GAS_FOR_RETRY=55000000000 (55 Gwei) -# MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY=55000000000 (55 Gwei) -# MAX_RETRIES_FOR_TX=3 -# MAX_BLOCKS_ELAPSED_BEFORE_RETRY=50 -# RETRY_TX_CRON_SCHEDULE=*/30 * * * * * -# RETRY_TX_ENABLED=true -MAX_FEE_PER_GAS_FOR_RETRY=55000000000 -MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY=55000000000 \ No newline at end of file +BENCHMARK_REQUESTS=10 \ No newline at end of file diff --git a/docs/1-user-guide.md b/docs/1-user-guide.md index 2b3e94e4c..4629e489e 100644 --- a/docs/1-user-guide.md +++ b/docs/1-user-guide.md @@ -4,18 +4,14 @@ ### Environment Variables -| Variable Name | Description | Default Value | Required | -| ---------------------------------- | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -------- | -| `THIRDWEB_API_SECRET_KEY` | thirdweb Api Secret Key (get it from thirdweb.com/dashboard) | | ✅ | -| `POSTGRES_CONNECTION_URL` | PostgreSQL Connection string | postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable | ✅ | -| `HOST` | Host name of the API Server | `localhost` | ❌ | -| `PORT` | Port number of the API Server | `3005` | ❌ | -| `TRANSACTIONS_TO_BATCH` | Number of transactions to batch process at a time. | `10` | ❌ | -| `CHAIN_OVERRIDES` | Pass your own RPC urls to override the default ones. This can be file or an URL. See example override-rpc-urls.json | | ❌ | -| `OPENAPI_BASE_ORIGIN` | Base URL for Open API Specification. Should be the Base URL of your App. | `http://localhost:3005` | ❌ | -| `MINED_TX_CRON_ENABLED` | Flag to indicate whether to run the cron job to check mined transactions. | `true` | ❌ | -| `MINED_TX_CRON_SCHEDULE` | Cron Schedule for the cron job to check mined transactions. | `*/30 * * * *` | ❌ | -| `MIN_TX_TO_CHECK_FOR_MINED_STATUS` | Number of transactions to check for mined status at a time. | `50` | ❌ | +| Variable Name | Description | Default Value | Required | +| ------------------------- | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -------- | +| `THIRDWEB_API_SECRET_KEY` | thirdweb Api Secret Key (get it from thirdweb.com/dashboard) | | ✅ | +| `POSTGRES_CONNECTION_URL` | PostgreSQL Connection string | postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable | ✅ | +| `HOST` | Host name of the API Server | `localhost` | ❌ | +| `PORT` | Port number of the API Server | `3005` | ❌ | +| `CHAIN_OVERRIDES` | Pass your own RPC urls to override the default ones. This can be file or an URL. See example override-rpc-urls.json | | ❌ | +| `OPENAPI_BASE_ORIGIN` | Base URL for Open API Specification. Should be the Base URL of your App. | `http://localhost:3005` | ❌ | ### Setup Instructions diff --git a/src/db/configuration/getConfiguration.ts b/src/db/configuration/getConfiguration.ts new file mode 100644 index 000000000..91bd7bdfc --- /dev/null +++ b/src/db/configuration/getConfiguration.ts @@ -0,0 +1,26 @@ +import { Configuration } from "@prisma/client"; +import { prisma } from "../client"; + +export const getConfiguration = async (): Promise => { + const config = await prisma.configuration.findFirst(); + + if (config) { + // If we have a configuration object already setup, use it directly + return config; + } + + // Here we set all our defaults when first creating the configuration + return prisma.configuration.create({ + data: { + minTxsToProcess: 1, + maxTxsToProcess: 10, + minedTxListenerCronSchedule: "*/5 * * * * *", + maxTxsToUpdate: 50, + retryTxListenerCronSchedule: "*/30 * * * * *", + minEllapsedBlocksBeforeRetry: 15, + maxFeePerGasForRetries: "55000000000", + maxPriorityFeePerGasForRetries: "55000000000", + maxRetriesPerTx: 3, + }, + }); +}; diff --git a/src/db/transactions/getQueuedTxs.ts b/src/db/transactions/getQueuedTxs.ts index 170f6bdeb..766533539 100644 --- a/src/db/transactions/getQueuedTxs.ts +++ b/src/db/transactions/getQueuedTxs.ts @@ -1,8 +1,8 @@ import { Static } from "@sinclair/typebox"; import { transactionResponseSchema } from "../../../server/schemas/transaction"; import { PrismaTransaction } from "../../schema/prisma"; -import { env } from "../../utils/env"; import { getPrismaWithPostgresTx } from "../client"; +import { getConfiguration } from "../configuration/getConfiguration"; import { cleanTxs } from "./cleanTxs"; interface GetQueuedTxsParams { @@ -13,6 +13,7 @@ export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise< Static[] > => { const prisma = getPrismaWithPostgresTx(pgtx); + const config = await getConfiguration(); // TODO: Don't use env var for transactions to batch const txs = await prisma.$queryRaw` @@ -29,7 +30,7 @@ ORDER BY "queuedAt" ASC LIMIT - ${env.TRANSACTIONS_TO_BATCH} + ${config.maxTxsToProcess} FOR UPDATE SKIP LOCKED `; diff --git a/src/db/transactions/getSentTxs.ts b/src/db/transactions/getSentTxs.ts index 9b84deaca..7f4647934 100644 --- a/src/db/transactions/getSentTxs.ts +++ b/src/db/transactions/getSentTxs.ts @@ -1,7 +1,7 @@ import { Transactions } from "@prisma/client"; import { PrismaTransaction } from "../../schema/prisma"; -import { env } from "../../utils/env"; import { getPrismaWithPostgresTx } from "../client"; +import { getConfiguration } from "../configuration/getConfiguration"; interface GetSentTxsParams { pgtx?: PrismaTransaction; @@ -11,6 +11,7 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise< Transactions[] > => { const prisma = getPrismaWithPostgresTx(pgtx); + const config = await getConfiguration(); return prisma.transactions.findMany({ where: { @@ -36,6 +37,6 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise< sentAt: "asc", }, ], - take: env.MIN_TX_TO_CHECK_FOR_MINED_STATUS, + take: config.maxTxsToUpdate, }); }; diff --git a/src/db/transactions/getSentUserOps.ts b/src/db/transactions/getSentUserOps.ts index f98594e87..f08bfa9c8 100644 --- a/src/db/transactions/getSentUserOps.ts +++ b/src/db/transactions/getSentUserOps.ts @@ -1,7 +1,7 @@ import { Transactions } from "@prisma/client"; import { PrismaTransaction } from "../../schema/prisma"; -import { env } from "../../utils/env"; import { getPrismaWithPostgresTx } from "../client"; +import { getConfiguration } from "../configuration/getConfiguration"; interface GetSentUserOpsParams { pgtx?: PrismaTransaction; @@ -11,6 +11,7 @@ export const getSentUserOps = async ({ pgtx, }: GetSentUserOpsParams = {}): Promise => { const prisma = getPrismaWithPostgresTx(pgtx); + const config = await getConfiguration(); return prisma.transactions.findMany({ where: { @@ -38,6 +39,6 @@ export const getSentUserOps = async ({ sentAt: "asc", }, ], - take: env.MIN_TX_TO_CHECK_FOR_MINED_STATUS, + take: config.maxTxsToUpdate, }); }; diff --git a/src/db/transactions/getTxToRetry.ts b/src/db/transactions/getTxToRetry.ts index aacdafbc8..fd12222fa 100644 --- a/src/db/transactions/getTxToRetry.ts +++ b/src/db/transactions/getTxToRetry.ts @@ -1,7 +1,7 @@ import { Transactions } from "@prisma/client"; import type { PrismaTransaction } from "../../schema/prisma"; -import { env } from "../../utils/env"; import { getPrismaWithPostgresTx } from "../client"; +import { getConfiguration } from "../configuration/getConfiguration"; interface GetTxToRetryParams { pgtx?: PrismaTransaction; @@ -11,6 +11,7 @@ export const getTxToRetry = async ({ pgtx }: GetTxToRetryParams = {}): Promise< Transactions | undefined > => { const prisma = getPrismaWithPostgresTx(pgtx); + const config = await getConfiguration(); // TODO: Remove transactionHash // TODO: For now, we're not retrying user ops @@ -26,7 +27,7 @@ WHERE AND "minedAt" IS NULL AND "errorMessage" IS NULL AND "transactionHash" IS NOT NULL - AND "retryCount" < ${env.MAX_RETRIES_FOR_TX} + AND "retryCount" < ${config.maxRetriesPerTx} ORDER BY "queuedAt" ASC diff --git a/src/prisma/migrations/20231016220744_configuration/migration.sql b/src/prisma/migrations/20231016220744_configuration/migration.sql new file mode 100644 index 000000000..0c101fbd5 --- /dev/null +++ b/src/prisma/migrations/20231016220744_configuration/migration.sql @@ -0,0 +1,15 @@ +-- CreateTable +CREATE TABLE "configuration" ( + "id" TEXT NOT NULL, + "minTxsToProcess" INTEGER NOT NULL, + "maxTxsToProcess" INTEGER NOT NULL, + "minedTxsCronSchedule" TEXT, + "maxTxsToUpdate" INTEGER NOT NULL, + "retryTxsCronSchedule" TEXT, + "minEllapsedBlocksBeforeRetry" INTEGER NOT NULL, + "maxFeePerGasForRetries" TEXT NOT NULL, + "maxPriorityFeePerGasForRetries" TEXT NOT NULL, + "maxRetriesPerTx" INTEGER NOT NULL, + + CONSTRAINT "configuration_pkey" PRIMARY KEY ("id") +); diff --git a/src/prisma/schema.prisma b/src/prisma/schema.prisma index 7738003ef..f773dc480 100644 --- a/src/prisma/schema.prisma +++ b/src/prisma/schema.prisma @@ -7,6 +7,24 @@ generator client { provider = "prisma-client-js" } +model Configuration { + id String @id @default(uuid()) @map("id") + // Tx Processing + minTxsToProcess Int @map("minTxsToProcess") + maxTxsToProcess Int @map("maxTxsToProcess") + // Tx Updates + minedTxListenerCronSchedule String? @map("minedTxsCronSchedule") + maxTxsToUpdate Int @map("maxTxsToUpdate") + // Tx Retries + retryTxListenerCronSchedule String? @map("retryTxsCronSchedule") + minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry") + maxFeePerGasForRetries String @map("maxFeePerGasForRetries") + maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries") + maxRetriesPerTx Int @map("maxRetriesPerTx") + + @@map("configuration") +} + model WalletDetails { address String @id @map("address") type String @map("type") diff --git a/src/utils/env.ts b/src/utils/env.ts index cfdeba7eb..7c8ab3bca 100644 --- a/src/utils/env.ts +++ b/src/utils/env.ts @@ -10,16 +10,6 @@ dotenv.config({ override: false, }); -// Boolean schema to validate and transform string "true" or "false" to boolean type -const boolSchema = (defaultBool: "true" | "false") => - z - .string() - .default(defaultBool) - // only allow "true" or "false" - .refine((s) => s === "true" || s === "false", "must be 'true' or 'false'") - // transform to boolean - .transform((s) => s === "true"); - // Schema for validating JSON strings export const JsonSchema = z.string().refine( (value) => { @@ -89,21 +79,10 @@ export const env = createEnv({ OPENAPI_BASE_ORIGIN: z.string().default("http://localhost:3005"), PORT: z.coerce.number().default(3005), HOST: z.string().default("0.0.0.0"), - MIN_TRANSACTION_TO_PROCESS: z.coerce.number().default(1), - TRANSACTIONS_TO_BATCH: z.coerce.number().default(10), CHAIN_OVERRIDES: z .union([JsonSchema, UrlSchema, FilePathSchema]) .optional(), ACCESS_CONTROL_ALLOW_ORIGIN: z.string().default("*"), - MINED_TX_CRON_ENABLED: boolSchema("true"), - MINED_TX_CRON_SCHEDULE: z.string().default("*/5 * * * * *"), - MIN_TX_TO_CHECK_FOR_MINED_STATUS: z.coerce.number().default(50), - RETRY_TX_ENABLED: boolSchema("true"), - MAX_FEE_PER_GAS_FOR_RETRY: z.string().default("55000000000"), - MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY: z.string().default("55000000000"), - MAX_RETRIES_FOR_TX: z.coerce.number().default(3), - RETRY_TX_CRON_SCHEDULE: z.string().default("*/30 * * * * *"), - MAX_BLOCKS_ELAPSED_BEFORE_RETRY: z.coerce.number().default(15), WEBHOOK_URL: z .string() .default("") @@ -153,22 +132,8 @@ export const env = createEnv({ PORT: process.env.PORT, HOST: process.env.HOST, OPENAPI_BASE_ORIGIN: process.env.OPENAPI_BASE_ORIGIN, - MIN_TRANSACTION_TO_PROCESS: process.env.MIN_TRANSACTION_TO_PROCESS, - TRANSACTIONS_TO_BATCH: process.env.TRANSACTIONS_TO_BATCH, CHAIN_OVERRIDES: process.env.CHAIN_OVERRIDES, ACCESS_CONTROL_ALLOW_ORIGIN: process.env.ACCESS_CONTROL_ALLOW_ORIGIN, - MINED_TX_CRON_ENABLED: process.env.MINED_TX_CRON_ENABLED, - MINED_TX_CRON_SCHEDULE: process.env.MINED_TX_CRON_SCHEDULE, - MIN_TX_TO_CHECK_FOR_MINED_STATUS: - process.env.MIN_TX_TO_CHECK_FOR_MINED_STATUS, - RETRY_TX_ENABLED: process.env.RETRY_TX_ENABLED, - MAX_FEE_PER_GAS_FOR_RETRY: process.env.MAX_FEE_PER_GAS_FOR_RETRY, - MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY: - process.env.MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY, - MAX_RETRIES_FOR_TX: process.env.MAX_RETRIES_FOR_TX, - RETRY_TX_CRON_SCHEDULE: process.env.RETRY_TX_CRON_SCHEDULE, - MAX_BLOCKS_ELAPSED_BEFORE_RETRY: - process.env.MAX_BLOCKS_ELAPSED_BEFORE_RETRY, WEBHOOK_URL: process.env.WEBHOOK_URL, WEBHOOK_AUTH_BEARER_TOKEN: process.env.WEBHOOK_AUTH_BEARER_TOKEN, }, diff --git a/src/worker/index.ts b/src/worker/index.ts index 8c7042e5e..66a7d0d38 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -2,16 +2,15 @@ import { minedTxListener } from "./listeners/minedTxListener"; import { queuedTxListener } from "./listeners/queuedTxListener"; import { retryTxListener } from "./listeners/retryTxListener"; -// TODO: CRONs should be configuration, not environment variables const worker = async () => { // Listen for queued transactions to process await queuedTxListener(); // Poll for transactions stuck in mempool to retry - retryTxListener(); + await retryTxListener(); // Poll for mined transactions to update database - minedTxListener(); + await minedTxListener(); }; worker(); diff --git a/src/worker/listeners/minedTxListener.ts b/src/worker/listeners/minedTxListener.ts index 5affffc43..cba83b771 100644 --- a/src/worker/listeners/minedTxListener.ts +++ b/src/worker/listeners/minedTxListener.ts @@ -1,14 +1,16 @@ import cron from "node-cron"; -import { env } from "../../utils/env"; +import { getConfiguration } from "../../db/configuration/getConfiguration"; import { updateMinedTx } from "../tasks/updateMinedTx"; import { updateMinedUserOps } from "../tasks/updateMinedUserOps"; -export const minedTxListener = () => { - cron.schedule(env.MINED_TX_CRON_SCHEDULE, async () => { - if (!env.MINED_TX_CRON_ENABLED) { - return; - } +export const minedTxListener = async () => { + const config = await getConfiguration(); + if (!config.minedTxListenerCronSchedule) { + return; + } + + cron.schedule(config.minedTxListenerCronSchedule, async () => { await updateMinedTx(); await updateMinedUserOps(); }); diff --git a/src/worker/listeners/retryTxListener.ts b/src/worker/listeners/retryTxListener.ts index e3d8db20c..b58527176 100644 --- a/src/worker/listeners/retryTxListener.ts +++ b/src/worker/listeners/retryTxListener.ts @@ -1,13 +1,15 @@ import cron from "node-cron"; -import { env } from "../../utils/env"; +import { getConfiguration } from "../../db/configuration/getConfiguration"; import { retryTx } from "../tasks/retryTx"; -export const retryTxListener = () => { - cron.schedule(env.RETRY_TX_CRON_SCHEDULE, async () => { - if (!env.RETRY_TX_ENABLED) { - return; - } +export const retryTxListener = async () => { + const config = await getConfiguration(); + if (!config.retryTxListenerCronSchedule) { + return; + } + + cron.schedule(config.retryTxListenerCronSchedule, async () => { await retryTx(); }); }; diff --git a/src/worker/tasks/processTx.ts b/src/worker/tasks/processTx.ts index 7d9ec1781..56a78f0c3 100644 --- a/src/worker/tasks/processTx.ts +++ b/src/worker/tasks/processTx.ts @@ -9,11 +9,11 @@ import { } from "../../../server/schemas/transaction"; import { getSdk } from "../../../server/utils/cache/getSdk"; import { prisma } from "../../db/client"; +import { getConfiguration } from "../../db/configuration/getConfiguration"; import { getQueuedTxs } from "../../db/transactions/getQueuedTxs"; import { updateTx } from "../../db/transactions/updateTx"; import { getWalletNonce } from "../../db/wallets/getWalletNonce"; import { updateWalletNonce } from "../../db/wallets/updateWalletNonce"; -import { env } from "../../utils/env"; import { logger } from "../../utils/logger"; import { randomNonce } from "../utils/nonce"; @@ -37,7 +37,8 @@ export const processTx = async () => { // 1. Select a batch of transactions and lock the rows so no other workers pick them up const txs = await getQueuedTxs({ pgtx }); - if (txs.length < env.MIN_TRANSACTION_TO_PROCESS) { + const config = await getConfiguration(); + if (txs.length < config.minTxsToProcess) { return; } diff --git a/src/worker/tasks/retryTx.ts b/src/worker/tasks/retryTx.ts index ed3f68c1d..b8875686e 100644 --- a/src/worker/tasks/retryTx.ts +++ b/src/worker/tasks/retryTx.ts @@ -4,9 +4,9 @@ import { BigNumber } from "ethers/lib/ethers"; import { TransactionStatusEnum } from "../../../server/schemas/transaction"; import { getSdk } from "../../../server/utils/cache/getSdk"; import { prisma } from "../../db/client"; +import { getConfiguration } from "../../db/configuration/getConfiguration"; import { getTxToRetry } from "../../db/transactions/getTxToRetry"; import { updateTx } from "../../db/transactions/updateTx"; -import { env } from "../../utils/env"; import { logger } from "../../utils/logger"; export const retryTx = async () => { @@ -19,6 +19,8 @@ export const retryTx = async () => { return; } + const config = await getConfiguration(); + const sdk = await getSdk({ chainId: tx.chainId!, walletAddress: tx.fromAddress!, @@ -27,7 +29,7 @@ export const retryTx = async () => { // Only retry if more than the ellapsed blocks before retry has passed if ( blockNumber - tx.sentAtBlockNumber! <= - env.MAX_BLOCKS_ELAPSED_BEFORE_RETRY + config.minEllapsedBlocksBeforeRetry ) { return; } @@ -53,9 +55,9 @@ export const retryTx = async () => { tx.maxFeePerGas = tx.retryMaxFeePerGas!; tx.maxPriorityFeePerGas = tx.maxPriorityFeePerGas!; } else if ( - gasOverrides.maxFeePerGas?.gt(env.MAX_FEE_PER_GAS_FOR_RETRY!) || + gasOverrides.maxFeePerGas?.gt(config.maxFeePerGasForRetries) || gasOverrides.maxPriorityFeePerGas?.gt( - env.MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY!, + config.maxPriorityFeePerGasForRetries, ) ) { logger.worker.warn(