Skip to content

Commit

Permalink
Migrate worker configuration to database (#224)
Browse files Browse the repository at this point in the history
  • Loading branch information
adam-maj authored Oct 16, 2023
1 parent 4ac5a5c commit b6f8dbf
Show file tree
Hide file tree
Showing 15 changed files with 107 additions and 92 deletions.
17 changes: 1 addition & 16 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,6 @@ OPENAPI_BASE_ORIGIN=http://localhost:3005

# CONFIGURATION OPTIONS [Optional]
# ----------------------
# the number of transactions to queue for before starting processing
MIN_TRANSACTION_TO_PROCESS=1
# the max number of transactions to batch together for processing
TRANSACTIONS_TO_BATCH=10
# Override default RPCs for any chain.
# Can be a URL or file path to a json config file
# See example config file: chain-overrides.example.json
Expand Down Expand Up @@ -69,15 +65,4 @@ BENCHMARK_POST_BODY='{
"args": ["0x1946267d81Fb8aDeeEa28e6B98bcD446c8248473", 100000]
}'
BENCHMARK_CONCURRENCY=10
BENCHMARK_REQUESTS=10

# Retey Gas Max Values (All in wei)
# Default Values
# MAX_FEE_PER_GAS_FOR_RETRY=55000000000 (55 Gwei)
# MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY=55000000000 (55 Gwei)
# MAX_RETRIES_FOR_TX=3
# MAX_BLOCKS_ELAPSED_BEFORE_RETRY=50
# RETRY_TX_CRON_SCHEDULE=*/30 * * * * *
# RETRY_TX_ENABLED=true
MAX_FEE_PER_GAS_FOR_RETRY=55000000000
MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY=55000000000
BENCHMARK_REQUESTS=10
20 changes: 8 additions & 12 deletions docs/1-user-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,14 @@

### Environment Variables

| Variable Name | Description | Default Value | Required |
| ---------------------------------- | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -------- |
| `THIRDWEB_API_SECRET_KEY` | thirdweb Api Secret Key (get it from thirdweb.com/dashboard) | ||
| `POSTGRES_CONNECTION_URL` | PostgreSQL Connection string | postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable ||
| `HOST` | Host name of the API Server | `localhost` ||
| `PORT` | Port number of the API Server | `3005` ||
| `TRANSACTIONS_TO_BATCH` | Number of transactions to batch process at a time. | `10` ||
| `CHAIN_OVERRIDES` | Pass your own RPC urls to override the default ones. This can be file or an URL. See example override-rpc-urls.json | ||
| `OPENAPI_BASE_ORIGIN` | Base URL for Open API Specification. Should be the Base URL of your App. | `http://localhost:3005` ||
| `MINED_TX_CRON_ENABLED` | Flag to indicate whether to run the cron job to check mined transactions. | `true` ||
| `MINED_TX_CRON_SCHEDULE` | Cron Schedule for the cron job to check mined transactions. | `*/30 * * * *` ||
| `MIN_TX_TO_CHECK_FOR_MINED_STATUS` | Number of transactions to check for mined status at a time. | `50` ||
| Variable Name | Description | Default Value | Required |
| ------------------------- | ------------------------------------------------------------------------------------------------------------------- | -------------------------------------------------------------------- | -------- |
| `THIRDWEB_API_SECRET_KEY` | thirdweb Api Secret Key (get it from thirdweb.com/dashboard) | ||
| `POSTGRES_CONNECTION_URL` | PostgreSQL Connection string | postgres://postgres:postgres@localhost:5432/postgres?sslmode=disable ||
| `HOST` | Host name of the API Server | `localhost` ||
| `PORT` | Port number of the API Server | `3005` ||
| `CHAIN_OVERRIDES` | Pass your own RPC urls to override the default ones. This can be file or an URL. See example override-rpc-urls.json | ||
| `OPENAPI_BASE_ORIGIN` | Base URL for Open API Specification. Should be the Base URL of your App. | `http://localhost:3005` ||

### Setup Instructions

Expand Down
26 changes: 26 additions & 0 deletions src/db/configuration/getConfiguration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import { Configuration } from "@prisma/client";
import { prisma } from "../client";

export const getConfiguration = async (): Promise<Configuration> => {
const config = await prisma.configuration.findFirst();

if (config) {
// If we have a configuration object already setup, use it directly
return config;
}

// Here we set all our defaults when first creating the configuration
return prisma.configuration.create({
data: {
minTxsToProcess: 1,
maxTxsToProcess: 10,
minedTxListenerCronSchedule: "*/5 * * * * *",
maxTxsToUpdate: 50,
retryTxListenerCronSchedule: "*/30 * * * * *",
minEllapsedBlocksBeforeRetry: 15,
maxFeePerGasForRetries: "55000000000",
maxPriorityFeePerGasForRetries: "55000000000",
maxRetriesPerTx: 3,
},
});
};
5 changes: 3 additions & 2 deletions src/db/transactions/getQueuedTxs.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
import { Static } from "@sinclair/typebox";
import { transactionResponseSchema } from "../../../server/schemas/transaction";
import { PrismaTransaction } from "../../schema/prisma";
import { env } from "../../utils/env";
import { getPrismaWithPostgresTx } from "../client";
import { getConfiguration } from "../configuration/getConfiguration";
import { cleanTxs } from "./cleanTxs";

interface GetQueuedTxsParams {
Expand All @@ -13,6 +13,7 @@ export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise<
Static<typeof transactionResponseSchema>[]
> => {
const prisma = getPrismaWithPostgresTx(pgtx);
const config = await getConfiguration();

// TODO: Don't use env var for transactions to batch
const txs = await prisma.$queryRaw`
Expand All @@ -29,7 +30,7 @@ ORDER BY
"queuedAt"
ASC
LIMIT
${env.TRANSACTIONS_TO_BATCH}
${config.maxTxsToProcess}
FOR UPDATE SKIP LOCKED
`;

Expand Down
5 changes: 3 additions & 2 deletions src/db/transactions/getSentTxs.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Transactions } from "@prisma/client";
import { PrismaTransaction } from "../../schema/prisma";
import { env } from "../../utils/env";
import { getPrismaWithPostgresTx } from "../client";
import { getConfiguration } from "../configuration/getConfiguration";

interface GetSentTxsParams {
pgtx?: PrismaTransaction;
Expand All @@ -11,6 +11,7 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise<
Transactions[]
> => {
const prisma = getPrismaWithPostgresTx(pgtx);
const config = await getConfiguration();

return prisma.transactions.findMany({
where: {
Expand All @@ -36,6 +37,6 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise<
sentAt: "asc",
},
],
take: env.MIN_TX_TO_CHECK_FOR_MINED_STATUS,
take: config.maxTxsToUpdate,
});
};
5 changes: 3 additions & 2 deletions src/db/transactions/getSentUserOps.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Transactions } from "@prisma/client";
import { PrismaTransaction } from "../../schema/prisma";
import { env } from "../../utils/env";
import { getPrismaWithPostgresTx } from "../client";
import { getConfiguration } from "../configuration/getConfiguration";

interface GetSentUserOpsParams {
pgtx?: PrismaTransaction;
Expand All @@ -11,6 +11,7 @@ export const getSentUserOps = async ({
pgtx,
}: GetSentUserOpsParams = {}): Promise<Transactions[]> => {
const prisma = getPrismaWithPostgresTx(pgtx);
const config = await getConfiguration();

return prisma.transactions.findMany({
where: {
Expand Down Expand Up @@ -38,6 +39,6 @@ export const getSentUserOps = async ({
sentAt: "asc",
},
],
take: env.MIN_TX_TO_CHECK_FOR_MINED_STATUS,
take: config.maxTxsToUpdate,
});
};
5 changes: 3 additions & 2 deletions src/db/transactions/getTxToRetry.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Transactions } from "@prisma/client";
import type { PrismaTransaction } from "../../schema/prisma";
import { env } from "../../utils/env";
import { getPrismaWithPostgresTx } from "../client";
import { getConfiguration } from "../configuration/getConfiguration";

interface GetTxToRetryParams {
pgtx?: PrismaTransaction;
Expand All @@ -11,6 +11,7 @@ export const getTxToRetry = async ({ pgtx }: GetTxToRetryParams = {}): Promise<
Transactions | undefined
> => {
const prisma = getPrismaWithPostgresTx(pgtx);
const config = await getConfiguration();

// TODO: Remove transactionHash
// TODO: For now, we're not retrying user ops
Expand All @@ -26,7 +27,7 @@ WHERE
AND "minedAt" IS NULL
AND "errorMessage" IS NULL
AND "transactionHash" IS NOT NULL
AND "retryCount" < ${env.MAX_RETRIES_FOR_TX}
AND "retryCount" < ${config.maxRetriesPerTx}
ORDER BY
"queuedAt"
ASC
Expand Down
15 changes: 15 additions & 0 deletions src/prisma/migrations/20231016220744_configuration/migration.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- CreateTable
CREATE TABLE "configuration" (
"id" TEXT NOT NULL,
"minTxsToProcess" INTEGER NOT NULL,
"maxTxsToProcess" INTEGER NOT NULL,
"minedTxsCronSchedule" TEXT,
"maxTxsToUpdate" INTEGER NOT NULL,
"retryTxsCronSchedule" TEXT,
"minEllapsedBlocksBeforeRetry" INTEGER NOT NULL,
"maxFeePerGasForRetries" TEXT NOT NULL,
"maxPriorityFeePerGasForRetries" TEXT NOT NULL,
"maxRetriesPerTx" INTEGER NOT NULL,

CONSTRAINT "configuration_pkey" PRIMARY KEY ("id")
);
18 changes: 18 additions & 0 deletions src/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,24 @@ generator client {
provider = "prisma-client-js"
}

model Configuration {
id String @id @default(uuid()) @map("id")
// Tx Processing
minTxsToProcess Int @map("minTxsToProcess")
maxTxsToProcess Int @map("maxTxsToProcess")
// Tx Updates
minedTxListenerCronSchedule String? @map("minedTxsCronSchedule")
maxTxsToUpdate Int @map("maxTxsToUpdate")
// Tx Retries
retryTxListenerCronSchedule String? @map("retryTxsCronSchedule")
minEllapsedBlocksBeforeRetry Int @map("minEllapsedBlocksBeforeRetry")
maxFeePerGasForRetries String @map("maxFeePerGasForRetries")
maxPriorityFeePerGasForRetries String @map("maxPriorityFeePerGasForRetries")
maxRetriesPerTx Int @map("maxRetriesPerTx")
@@map("configuration")
}

model WalletDetails {
address String @id @map("address")
type String @map("type")
Expand Down
35 changes: 0 additions & 35 deletions src/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,6 @@ dotenv.config({
override: false,
});

// Boolean schema to validate and transform string "true" or "false" to boolean type
const boolSchema = (defaultBool: "true" | "false") =>
z
.string()
.default(defaultBool)
// only allow "true" or "false"
.refine((s) => s === "true" || s === "false", "must be 'true' or 'false'")
// transform to boolean
.transform((s) => s === "true");

// Schema for validating JSON strings
export const JsonSchema = z.string().refine(
(value) => {
Expand Down Expand Up @@ -89,21 +79,10 @@ export const env = createEnv({
OPENAPI_BASE_ORIGIN: z.string().default("http://localhost:3005"),
PORT: z.coerce.number().default(3005),
HOST: z.string().default("0.0.0.0"),
MIN_TRANSACTION_TO_PROCESS: z.coerce.number().default(1),
TRANSACTIONS_TO_BATCH: z.coerce.number().default(10),
CHAIN_OVERRIDES: z
.union([JsonSchema, UrlSchema, FilePathSchema])
.optional(),
ACCESS_CONTROL_ALLOW_ORIGIN: z.string().default("*"),
MINED_TX_CRON_ENABLED: boolSchema("true"),
MINED_TX_CRON_SCHEDULE: z.string().default("*/5 * * * * *"),
MIN_TX_TO_CHECK_FOR_MINED_STATUS: z.coerce.number().default(50),
RETRY_TX_ENABLED: boolSchema("true"),
MAX_FEE_PER_GAS_FOR_RETRY: z.string().default("55000000000"),
MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY: z.string().default("55000000000"),
MAX_RETRIES_FOR_TX: z.coerce.number().default(3),
RETRY_TX_CRON_SCHEDULE: z.string().default("*/30 * * * * *"),
MAX_BLOCKS_ELAPSED_BEFORE_RETRY: z.coerce.number().default(15),
WEBHOOK_URL: z
.string()
.default("")
Expand Down Expand Up @@ -153,22 +132,8 @@ export const env = createEnv({
PORT: process.env.PORT,
HOST: process.env.HOST,
OPENAPI_BASE_ORIGIN: process.env.OPENAPI_BASE_ORIGIN,
MIN_TRANSACTION_TO_PROCESS: process.env.MIN_TRANSACTION_TO_PROCESS,
TRANSACTIONS_TO_BATCH: process.env.TRANSACTIONS_TO_BATCH,
CHAIN_OVERRIDES: process.env.CHAIN_OVERRIDES,
ACCESS_CONTROL_ALLOW_ORIGIN: process.env.ACCESS_CONTROL_ALLOW_ORIGIN,
MINED_TX_CRON_ENABLED: process.env.MINED_TX_CRON_ENABLED,
MINED_TX_CRON_SCHEDULE: process.env.MINED_TX_CRON_SCHEDULE,
MIN_TX_TO_CHECK_FOR_MINED_STATUS:
process.env.MIN_TX_TO_CHECK_FOR_MINED_STATUS,
RETRY_TX_ENABLED: process.env.RETRY_TX_ENABLED,
MAX_FEE_PER_GAS_FOR_RETRY: process.env.MAX_FEE_PER_GAS_FOR_RETRY,
MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY:
process.env.MAX_PRIORITY_FEE_PER_GAS_FOR_RETRY,
MAX_RETRIES_FOR_TX: process.env.MAX_RETRIES_FOR_TX,
RETRY_TX_CRON_SCHEDULE: process.env.RETRY_TX_CRON_SCHEDULE,
MAX_BLOCKS_ELAPSED_BEFORE_RETRY:
process.env.MAX_BLOCKS_ELAPSED_BEFORE_RETRY,
WEBHOOK_URL: process.env.WEBHOOK_URL,
WEBHOOK_AUTH_BEARER_TOKEN: process.env.WEBHOOK_AUTH_BEARER_TOKEN,
},
Expand Down
5 changes: 2 additions & 3 deletions src/worker/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,15 @@ import { minedTxListener } from "./listeners/minedTxListener";
import { queuedTxListener } from "./listeners/queuedTxListener";
import { retryTxListener } from "./listeners/retryTxListener";

// TODO: CRONs should be configuration, not environment variables
const worker = async () => {
// Listen for queued transactions to process
await queuedTxListener();

// Poll for transactions stuck in mempool to retry
retryTxListener();
await retryTxListener();

// Poll for mined transactions to update database
minedTxListener();
await minedTxListener();
};

worker();
14 changes: 8 additions & 6 deletions src/worker/listeners/minedTxListener.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import cron from "node-cron";
import { env } from "../../utils/env";
import { getConfiguration } from "../../db/configuration/getConfiguration";
import { updateMinedTx } from "../tasks/updateMinedTx";
import { updateMinedUserOps } from "../tasks/updateMinedUserOps";

export const minedTxListener = () => {
cron.schedule(env.MINED_TX_CRON_SCHEDULE, async () => {
if (!env.MINED_TX_CRON_ENABLED) {
return;
}
export const minedTxListener = async () => {
const config = await getConfiguration();

if (!config.minedTxListenerCronSchedule) {
return;
}

cron.schedule(config.minedTxListenerCronSchedule, async () => {
await updateMinedTx();
await updateMinedUserOps();
});
Expand Down
14 changes: 8 additions & 6 deletions src/worker/listeners/retryTxListener.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
import cron from "node-cron";
import { env } from "../../utils/env";
import { getConfiguration } from "../../db/configuration/getConfiguration";
import { retryTx } from "../tasks/retryTx";

export const retryTxListener = () => {
cron.schedule(env.RETRY_TX_CRON_SCHEDULE, async () => {
if (!env.RETRY_TX_ENABLED) {
return;
}
export const retryTxListener = async () => {
const config = await getConfiguration();

if (!config.retryTxListenerCronSchedule) {
return;
}

cron.schedule(config.retryTxListenerCronSchedule, async () => {
await retryTx();
});
};
5 changes: 3 additions & 2 deletions src/worker/tasks/processTx.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import {
} from "../../../server/schemas/transaction";
import { getSdk } from "../../../server/utils/cache/getSdk";
import { prisma } from "../../db/client";
import { getConfiguration } from "../../db/configuration/getConfiguration";
import { getQueuedTxs } from "../../db/transactions/getQueuedTxs";
import { updateTx } from "../../db/transactions/updateTx";
import { getWalletNonce } from "../../db/wallets/getWalletNonce";
import { updateWalletNonce } from "../../db/wallets/updateWalletNonce";
import { env } from "../../utils/env";
import { logger } from "../../utils/logger";
import { randomNonce } from "../utils/nonce";

Expand All @@ -37,7 +37,8 @@ export const processTx = async () => {
// 1. Select a batch of transactions and lock the rows so no other workers pick them up
const txs = await getQueuedTxs({ pgtx });

if (txs.length < env.MIN_TRANSACTION_TO_PROCESS) {
const config = await getConfiguration();
if (txs.length < config.minTxsToProcess) {
return;
}

Expand Down
Loading

0 comments on commit b6f8dbf

Please sign in to comment.