Skip to content

Commit

Permalink
MineTransactionWorker retries more frequently, no cap on resends (#633)
Browse files Browse the repository at this point in the history
* fix: add from address to claimTo to support merkle proofs

* fix: have mine worker retry more frequently

* remove mine worker in retry-failed

* add log line for nonce acquire
  • Loading branch information
arcoraven authored Aug 30, 2024
1 parent 9fd4d9c commit 52981f0
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 27 deletions.
1 change: 1 addition & 0 deletions src/db/wallets/walletNonce.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ export const deleteAllNonces = async () => {
const keys = [
...(await redis.keys("nonce:*")),
...(await redis.keys("nonce-recycled:*")),
...(await redis.keys("sent-nonce:*")),
];
if (keys.length > 0) {
await redis.del(keys);
Expand Down
15 changes: 12 additions & 3 deletions src/server/routes/transaction/retry-failed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { eth_getTransactionReceipt, getRpcClient } from "thirdweb";
import { TransactionDB } from "../../../db/transactions/db";
import { getChain } from "../../../utils/chain";
import { thirdwebClient } from "../../../utils/sdk";
import { MineTransactionQueue } from "../../../worker/queues/mineTransactionQueue";
import { SendTransactionQueue } from "../../../worker/queues/sendTransactionQueue";
import { createCustomError } from "../../middleware/error";
import { standardResponseSchema } from "../../schemas/sharedApiSchemas";
Expand Down Expand Up @@ -108,15 +109,23 @@ export async function retryFailedTransaction(fastify: FastifyInstance) {
}
}

const job = await SendTransactionQueue.q.getJob(
const sendJob = await SendTransactionQueue.q.getJob(
SendTransactionQueue.jobId({
queueId: transaction.queueId,
resendCount: 0,
}),
);
if (sendJob) {
await sendJob.remove();
}

if (job) {
await job.remove();
const mineJob = await MineTransactionQueue.q.getJob(
MineTransactionQueue.jobId({
queueId: transaction.queueId,
}),
);
if (mineJob) {
await mineJob.remove();
}

await SendTransactionQueue.add({
Expand Down
18 changes: 7 additions & 11 deletions src/worker/queues/mineTransactionQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,8 @@ export type MineTransactionData = {
export class MineTransactionQueue {
static q = new Queue<string>("transactions-2-mine", {
connection: redis,
defaultJobOptions: {
...defaultJobOptions,
// Delay confirming the tx by 500ms.
delay: 500,
// Retry after 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1024s (17 minutes)
// This needs to be long enough to handle transactions stuck in mempool.
// @TODO: This can be more optimized based on the chain block time.
attempts: 10,
backoff: { type: "exponential", delay: 2_000 },
},
// Backoff strategy is defined on the worker (`BackeoffStrategy`) and when adding to the queue (`attempts`).
defaultJobOptions,
});

// There must be a worker to poll the result for every transaction hash,
Expand All @@ -29,7 +21,11 @@ export class MineTransactionQueue {
static add = async (data: MineTransactionData) => {
const serialized = superjson.stringify(data);
const jobId = this.jobId(data);
await this.q.add(jobId, serialized, { jobId });
await this.q.add(jobId, serialized, {
jobId,
attempts: 200, // > 30 minutes with the backoffStrategy defined on the worker
backoff: { type: "custom" },
});
};

static length = async () => this.q.getWaitingCount();
Expand Down
28 changes: 16 additions & 12 deletions src/worker/tasks/mineTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,19 +159,17 @@ const _mineTransaction = async (

// Resend the transaction (after some initial delay).
const config = await getConfig();
if (resendCount < config.maxRetriesPerTx) {
const blockNumber = await getBlockNumberish(chainId);
const ellapsedBlocks = blockNumber - sentAtBlock;
if (ellapsedBlocks >= config.minEllapsedBlocksBeforeRetry) {
const message = `Resending transaction after ${ellapsedBlocks} blocks. blockNumber=${blockNumber} sentAtBlock=${sentAtBlock}`;
job.log(message);
logger({ service: "worker", level: "info", queueId, message });
const blockNumber = await getBlockNumberish(chainId);
const ellapsedBlocks = blockNumber - sentAtBlock;
if (ellapsedBlocks >= config.minEllapsedBlocksBeforeRetry) {
const message = `Resending transaction after ${ellapsedBlocks} blocks. blockNumber=${blockNumber} sentAtBlock=${sentAtBlock}`;
job.log(message);
logger({ service: "worker", level: "info", queueId, message });

await SendTransactionQueue.add({
queueId,
resendCount: resendCount + 1,
});
}
await SendTransactionQueue.add({
queueId,
resendCount: resendCount + 1,
});
}

return null;
Expand Down Expand Up @@ -229,6 +227,12 @@ export const initMineTransactionWorker = () => {
const _worker = new Worker(MineTransactionQueue.q.name, handler, {
concurrency: env.CONFIRM_TRANSACTION_QUEUE_CONCURRENCY,
connection: redis,
settings: {
backoffStrategy: (attemptsMade: number) => {
// Retries after: 2s, 4s, 6s, 8s, 10s, 10s, 10s, 10s, ...
return Math.min(attemptsMade * 2_000, 10_000);
},
},
});

// If a transaction fails to mine after all retries, set it as errored and release the nonce.
Expand Down
11 changes: 10 additions & 1 deletion src/worker/tasks/sendTransactionWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
isReplacementGasFeeTooLow,
prettifyError,
} from "../../utils/error";
import { logger } from "../../utils/logger";
import { getChecksumAddress } from "../../utils/primitiveTypes";
import { redis } from "../../utils/redis/redis";
import { thirdwebClient } from "../../utils/sdk";
Expand Down Expand Up @@ -181,7 +182,15 @@ const _sendTransaction = async (

// Acquire an unused nonce for this transaction.
const { nonce, isRecycledNonce } = await acquireNonce(chainId, from);
job.log(`Acquired nonce ${nonce}. isRecycledNonce=${isRecycledNonce}`);
job.log(
`Acquired nonce ${nonce} for transaction ${queuedTransaction.queueId}. isRecycledNonce=${isRecycledNonce}`,
);
logger({
level: "info",
message: `Acquired nonce ${nonce} for transaction ${queuedTransaction.queueId}. isRecycledNonce=${isRecycledNonce}`,
service: "worker",
});

populatedTransaction.nonce = nonce;
job.log(`Sending transaction: ${stringify(populatedTransaction)}`);

Expand Down

0 comments on commit 52981f0

Please sign in to comment.