From 77d3cf894ee755db7edbe97ae2500ba213da9a8b Mon Sep 17 00:00:00 2001 From: farhanW3 <132962163+farhanW3@users.noreply.github.com> Date: Mon, 8 Jan 2024 23:12:15 +0530 Subject: [PATCH] Configuration + Webhooks : PG Notify/Trigger (#361) * updated implementation to use configuration cache * changed log level for cache to debug * removed extra export fr cache * updated config end-points to pull latest & return latest in response * removed extra logging for webhooks * cleaned webhooks cache debug logs --- src/db/transactions/getQueuedTxs.ts | 4 +- src/db/transactions/getSentTxs.ts | 4 +- src/db/transactions/getSentUserOps.ts | 4 +- src/db/transactions/getTxToRetry.ts | 4 +- src/db/webhooks/createWebhook.ts | 4 - src/db/webhooks/revokeWebhook.ts | 3 - .../migration.sql | 65 ++++++++++ src/server/middleware/auth.ts | 12 +- .../routes/auth/access-tokens/create.ts | 4 +- src/server/routes/backend-wallet/create.ts | 4 +- src/server/routes/backend-wallet/import.ts | 4 +- src/server/routes/configuration/auth/get.ts | 4 +- .../routes/configuration/auth/update.ts | 5 +- .../backend-wallet-balance/get.ts | 4 +- .../backend-wallet-balance/update.ts | 5 +- src/server/routes/configuration/chains/get.ts | 4 +- .../routes/configuration/chains/update.ts | 4 +- .../routes/configuration/transactions/get.ts | 4 +- .../configuration/transactions/update.ts | 4 +- .../routes/configuration/wallets/get.ts | 4 +- .../routes/configuration/wallets/update.ts | 4 +- src/server/utils/chain.ts | 4 +- .../utils/wallets/createAwsKmsWallet.ts | 4 +- .../utils/wallets/createGcpKmsWallet.ts | 4 +- src/server/utils/wallets/createLocalWallet.ts | 4 +- src/server/utils/wallets/getAwsKmsWallet.ts | 4 +- src/server/utils/wallets/getGcpKmsWallet.ts | 4 +- src/server/utils/wallets/getLocalWallet.ts | 4 +- .../utils/wallets/importAwsKmsWallet.ts | 4 +- .../utils/wallets/importGcpKmsWallet.ts | 4 +- src/server/utils/webhook.ts | 64 +++++----- src/utils/cache/getConfig.ts | 57 +++++++++ src/utils/cache/getSdk.ts | 4 +- src/utils/cache/getWebhook.ts | 14 +- src/utils/env.ts | 4 +- src/worker/index.ts | 16 +++ src/worker/listeners/configListener.ts | 116 +++++++++++++++++ src/worker/listeners/minedTxListener.ts | 11 +- src/worker/listeners/queuedTxListener.ts | 11 +- src/worker/listeners/retryTxListener.ts | 11 +- src/worker/listeners/webhookListener.ts | 120 ++++++++++++++++++ src/worker/tasks/processTx.ts | 4 +- src/worker/tasks/retryTx.ts | 4 +- 43 files changed, 510 insertions(+), 116 deletions(-) create mode 100644 src/prisma/migrations/20231226100100_webhook_config_triggers/migration.sql create mode 100644 src/utils/cache/getConfig.ts create mode 100644 src/worker/listeners/configListener.ts create mode 100644 src/worker/listeners/webhookListener.ts diff --git a/src/db/transactions/getQueuedTxs.ts b/src/db/transactions/getQueuedTxs.ts index 2a6b065ee..e310511fb 100644 --- a/src/db/transactions/getQueuedTxs.ts +++ b/src/db/transactions/getQueuedTxs.ts @@ -2,8 +2,8 @@ import { Transactions } from "@prisma/client"; import { Static } from "@sinclair/typebox"; import { PrismaTransaction } from "../../schema/prisma"; import { transactionResponseSchema } from "../../server/schemas/transaction"; +import { getConfig } from "../../utils/cache/getConfig"; import { getPrismaWithPostgresTx } from "../client"; -import { getConfiguration } from "../configuration/getConfiguration"; import { cleanTxs } from "./cleanTxs"; interface GetQueuedTxsParams { @@ -14,7 +14,7 @@ export const getQueuedTxs = async ({ pgtx }: GetQueuedTxsParams = {}): Promise< Static[] > => { const prisma = getPrismaWithPostgresTx(pgtx); - const config = await getConfiguration(); + const config = await getConfig(); // TODO: Don't use env var for transactions to batch const txs = await prisma.$queryRaw` diff --git a/src/db/transactions/getSentTxs.ts b/src/db/transactions/getSentTxs.ts index b1a0f10c8..7525b8f1b 100644 --- a/src/db/transactions/getSentTxs.ts +++ b/src/db/transactions/getSentTxs.ts @@ -1,7 +1,7 @@ import { Transactions } from "@prisma/client"; import { PrismaTransaction } from "../../schema/prisma"; +import { getConfig } from "../../utils/cache/getConfig"; import { getPrismaWithPostgresTx } from "../client"; -import { getConfiguration } from "../configuration/getConfiguration"; interface GetSentTxsParams { pgtx?: PrismaTransaction; @@ -11,7 +11,7 @@ export const getSentTxs = async ({ pgtx }: GetSentTxsParams = {}): Promise< Transactions[] > => { const prisma = getPrismaWithPostgresTx(pgtx); - const config = await getConfiguration(); + const config = await getConfig(); return prisma.$queryRaw` SELECT * FROM "transactions" diff --git a/src/db/transactions/getSentUserOps.ts b/src/db/transactions/getSentUserOps.ts index 11a12208d..d55484c32 100644 --- a/src/db/transactions/getSentUserOps.ts +++ b/src/db/transactions/getSentUserOps.ts @@ -1,7 +1,7 @@ import { Transactions } from "@prisma/client"; import { PrismaTransaction } from "../../schema/prisma"; +import { getConfig } from "../../utils/cache/getConfig"; import { getPrismaWithPostgresTx } from "../client"; -import { getConfiguration } from "../configuration/getConfiguration"; interface GetSentUserOpsParams { pgtx?: PrismaTransaction; @@ -11,7 +11,7 @@ export const getSentUserOps = async ({ pgtx, }: GetSentUserOpsParams = {}): Promise => { const prisma = getPrismaWithPostgresTx(pgtx); - const config = await getConfiguration(); + const config = await getConfig(); return prisma.$queryRaw` SELECT * FROM "transactions" diff --git a/src/db/transactions/getTxToRetry.ts b/src/db/transactions/getTxToRetry.ts index 2bf7d71a6..bfe54cf64 100644 --- a/src/db/transactions/getTxToRetry.ts +++ b/src/db/transactions/getTxToRetry.ts @@ -1,7 +1,7 @@ import { Transactions } from "@prisma/client"; import type { PrismaTransaction } from "../../schema/prisma"; +import { getConfig } from "../../utils/cache/getConfig"; import { getPrismaWithPostgresTx } from "../client"; -import { getConfiguration } from "../configuration/getConfiguration"; interface GetTxToRetryParams { pgtx?: PrismaTransaction; @@ -11,7 +11,7 @@ export const getTxToRetry = async ({ pgtx }: GetTxToRetryParams = {}): Promise< Transactions | undefined > => { const prisma = getPrismaWithPostgresTx(pgtx); - const config = await getConfiguration(); + const config = await getConfig(); // TODO: Remove transactionHash // TODO: For now, we're not retrying user ops diff --git a/src/db/webhooks/createWebhook.ts b/src/db/webhooks/createWebhook.ts index cbd759f39..f02a5e883 100644 --- a/src/db/webhooks/createWebhook.ts +++ b/src/db/webhooks/createWebhook.ts @@ -1,6 +1,5 @@ import { createHash, randomBytes } from "crypto"; import { WebhooksEventTypes } from "../../schema/webhooks"; -import { webhookCache } from "../../utils/cache/getWebhook"; import { prisma } from "../client"; interface CreateWebhooksParams { @@ -19,9 +18,6 @@ export const insertWebhook = async ({ // hash the bytes to create the secret (this will not be stored by itself) const secret = createHash("sha512").update(bytes).digest("base64url"); - // Clear Cache - webhookCache.clear(); - return prisma.webhooks.create({ data: { url, diff --git a/src/db/webhooks/revokeWebhook.ts b/src/db/webhooks/revokeWebhook.ts index e754d18bb..9d85f63bd 100644 --- a/src/db/webhooks/revokeWebhook.ts +++ b/src/db/webhooks/revokeWebhook.ts @@ -1,6 +1,5 @@ import { StatusCodes } from "http-status-codes"; import { createCustomError } from "../../server/middleware/error"; -import { webhookCache } from "../../utils/cache/getWebhook"; import { prisma } from "../client"; interface RevokeWebhooksParams { @@ -8,8 +7,6 @@ interface RevokeWebhooksParams { } export const markWebhookAsRevoked = async ({ id }: RevokeWebhooksParams) => { - // Clear Cache - webhookCache.clear(); const currentTimestamp = new Date(); const exists = await prisma.webhooks.findUnique({ diff --git a/src/prisma/migrations/20231226100100_webhook_config_triggers/migration.sql b/src/prisma/migrations/20231226100100_webhook_config_triggers/migration.sql new file mode 100644 index 000000000..ea173e575 --- /dev/null +++ b/src/prisma/migrations/20231226100100_webhook_config_triggers/migration.sql @@ -0,0 +1,65 @@ +-- Configuration Triggers +CREATE OR REPLACE FUNCTION notify_configuration_insert() + RETURNS TRIGGER + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM pg_notify('new_configuration_data', row_to_json(NEW)::text); + RETURN NEW; +END; +$function$; + +CREATE OR REPLACE FUNCTION notify_configuration_update() + RETURNS TRIGGER + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM pg_notify('updated_configuration_data', json_build_object( + 'id', NEW.id + )::text); + RETURN NEW; +END; +$function$; + +CREATE OR REPLACE TRIGGER configuration_insert_trigger + AFTER INSERT ON configuration + FOR EACH ROW + EXECUTE FUNCTION notify_configuration_insert(); + +CREATE OR REPLACE TRIGGER configuration_update_trigger + AFTER UPDATE ON configuration + FOR EACH ROW + EXECUTE FUNCTION notify_configuration_update(); + +-- Webhooks Triggers +CREATE OR REPLACE FUNCTION notify_webhooks_insert() + RETURNS TRIGGER + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM pg_notify('new_webhook_data', row_to_json(NEW)::text); + RETURN NEW; +END; +$function$; + +CREATE OR REPLACE FUNCTION notify_webhooks_update() + RETURNS TRIGGER + LANGUAGE plpgsql +AS $function$ +BEGIN + PERFORM pg_notify('updated_webhook_data', json_build_object( + 'id', NEW.id + )::text); + RETURN NEW; +END; +$function$; + +CREATE OR REPLACE TRIGGER webhooks_insert_trigger + AFTER INSERT ON webhooks + FOR EACH ROW + EXECUTE FUNCTION notify_webhooks_insert(); + +CREATE OR REPLACE TRIGGER webhooks_update_trigger + AFTER UPDATE ON webhooks + FOR EACH ROW + EXECUTE FUNCTION notify_webhooks_update(); \ No newline at end of file diff --git a/src/server/middleware/auth.ts b/src/server/middleware/auth.ts index 29fb5b05b..8d004e2f4 100644 --- a/src/server/middleware/auth.ts +++ b/src/server/middleware/auth.ts @@ -8,14 +8,14 @@ import { GenericAuthWallet, LocalWallet } from "@thirdweb-dev/wallets"; import { AsyncWallet } from "@thirdweb-dev/wallets/evm/wallets/async"; import { utils } from "ethers"; import { FastifyInstance } from "fastify"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; import { updateConfiguration } from "../../db/configuration/updateConfiguration"; import { getPermissions } from "../../db/permissions/getPermissions"; import { createToken } from "../../db/tokens/createToken"; import { getToken } from "../../db/tokens/getToken"; import { revokeToken } from "../../db/tokens/revokeToken"; import { WebhooksEventTypes } from "../../schema/webhooks"; -import { getWebhookConfig } from "../../utils/cache/getWebhook"; +import { getConfig } from "../../utils/cache/getConfig"; +import { getWebhook } from "../../utils/cache/getWebhook"; import { env } from "../../utils/env"; import { logger } from "../../utils/logger"; import { Permission } from "../schemas/auth"; @@ -73,7 +73,7 @@ const authWithApiServer = async (jwt: string, domain: string) => { }; export const withAuth = async (server: FastifyInstance) => { - const config = await getConfiguration(); + const config = await getConfig(); // Configure the ThirdwebAuth fastify plugin const { authRouter, authMiddleware, getUser } = ThirdwebAuth< @@ -86,7 +86,7 @@ export const withAuth = async (server: FastifyInstance) => { wallet: new AsyncWallet({ // TODO: Use caching for the signer getSigner: async () => { - const config = await getConfiguration(); + const config = await getConfig(); const wallet = new LocalWallet(); try { @@ -209,7 +209,7 @@ export const withAuth = async (server: FastifyInstance) => { const thirdwebApiSecretKey = req.headers.authorization?.split(" ")[1]; if (thirdwebApiSecretKey === env.THIRDWEB_API_SECRET_KEY) { // If the secret key is being used, treat the user as the auth wallet - const config = await getConfiguration(); + const config = await getConfig(); const wallet = new LocalWallet(); try { @@ -300,7 +300,7 @@ export const withAuth = async (server: FastifyInstance) => { // no-op } - const authWebhooks = await getWebhookConfig(WebhooksEventTypes.AUTH); + const authWebhooks = await getWebhook(WebhooksEventTypes.AUTH); if (authWebhooks) { const authResponses = await Promise.all( authWebhooks.map((webhook) => diff --git a/src/server/routes/auth/access-tokens/create.ts b/src/server/routes/auth/access-tokens/create.ts index 857f4e62e..cedcf8024 100644 --- a/src/server/routes/auth/access-tokens/create.ts +++ b/src/server/routes/auth/access-tokens/create.ts @@ -3,9 +3,9 @@ import { buildJWT } from "@thirdweb-dev/auth"; import { LocalWallet } from "@thirdweb-dev/wallets"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; import { createToken } from "../../../../db/tokens/createToken"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { env } from "../../../../utils/env"; import { AccessTokenSchema } from "./getAll"; @@ -42,7 +42,7 @@ export async function createAccessToken(fastify: FastifyInstance) { handler: async (req, res) => { const { label } = req.body; - const config = await getConfiguration(); + const config = await getConfig(); const wallet = new LocalWallet(); // TODO: Remove this with next breaking change diff --git a/src/server/routes/backend-wallet/create.ts b/src/server/routes/backend-wallet/create.ts index 0fafe34bc..0dc1dd4b7 100644 --- a/src/server/routes/backend-wallet/create.ts +++ b/src/server/routes/backend-wallet/create.ts @@ -1,8 +1,8 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../schemas/sharedApiSchemas"; import { createAwsKmsWallet } from "../../utils/wallets/createAwsKmsWallet"; import { createGcpKmsWallet } from "../../utils/wallets/createGcpKmsWallet"; @@ -48,7 +48,7 @@ export const createBackendWallet = async (fastify: FastifyInstance) => { const { label } = req.body; let walletAddress: string; - const config = await getConfiguration(); + const config = await getConfig(); switch (config.walletConfiguration.type) { case WalletType.local: walletAddress = await createLocalWallet({ label }); diff --git a/src/server/routes/backend-wallet/import.ts b/src/server/routes/backend-wallet/import.ts index 22fa5129b..93be1463b 100644 --- a/src/server/routes/backend-wallet/import.ts +++ b/src/server/routes/backend-wallet/import.ts @@ -1,8 +1,8 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../schemas/sharedApiSchemas"; import { importAwsKmsWallet } from "../../utils/wallets/importAwsKmsWallet"; import { importGcpKmsWallet } from "../../utils/wallets/importGcpKmsWallet"; @@ -111,7 +111,7 @@ export const importBackendWallet = async (fastify: FastifyInstance) => { }, handler: async (request, reply) => { let walletAddress: string; - const config = await getConfiguration(); + const config = await getConfig(); switch (config.walletConfiguration.type) { case WalletType.local: // TODO: This is why where zod would be great diff --git a/src/server/routes/configuration/auth/get.ts b/src/server/routes/configuration/auth/get.ts index 7ae8a3ec5..def33214d 100644 --- a/src/server/routes/configuration/auth/get.ts +++ b/src/server/routes/configuration/auth/get.ts @@ -1,7 +1,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; export const ReplySchema = Type.Object({ result: Type.Object({ @@ -25,7 +25,7 @@ export async function getAuthConfiguration(fastify: FastifyInstance) { }, }, handler: async (req, res) => { - const config = await getConfiguration(); + const config = await getConfig(); res.status(200).send({ result: { domain: config.authDomain, diff --git a/src/server/routes/configuration/auth/update.ts b/src/server/routes/configuration/auth/update.ts index 0a83d9171..d4407f5fa 100644 --- a/src/server/routes/configuration/auth/update.ts +++ b/src/server/routes/configuration/auth/update.ts @@ -2,6 +2,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { ReplySchema } from "./get"; export const BodySchema = Type.Object({ @@ -26,10 +27,12 @@ export async function updateAuthConfiguration(fastify: FastifyInstance) { }, }, handler: async (req, res) => { - const config = await updateConfiguration({ + await updateConfiguration({ authDomain: req.body.domain, }); + const config = await getConfig(false); + res.status(200).send({ result: { domain: config.authDomain, diff --git a/src/server/routes/configuration/backend-wallet-balance/get.ts b/src/server/routes/configuration/backend-wallet-balance/get.ts index 35831b08b..62d6b654d 100644 --- a/src/server/routes/configuration/backend-wallet-balance/get.ts +++ b/src/server/routes/configuration/backend-wallet-balance/get.ts @@ -1,7 +1,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../../schemas/sharedApiSchemas"; export const ReplySchema = Type.Object({ @@ -31,7 +31,7 @@ export async function getBackendWalletBalanceConfiguration( }, }, handler: async (req, res) => { - const config = await getConfiguration(); + const config = await getConfig(); res.status(200).send({ result: { minWalletBalance: config.minWalletBalance, diff --git a/src/server/routes/configuration/backend-wallet-balance/update.ts b/src/server/routes/configuration/backend-wallet-balance/update.ts index 74ccfd2ef..f7af42343 100644 --- a/src/server/routes/configuration/backend-wallet-balance/update.ts +++ b/src/server/routes/configuration/backend-wallet-balance/update.ts @@ -2,6 +2,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { standardResponseSchema } from "../../../schemas/sharedApiSchemas"; import { ReplySchema } from "./get"; @@ -33,7 +34,9 @@ export async function updateBackendWalletBalanceConfiguration( }, }, handler: async (req, res) => { - const config = await updateConfiguration({ ...req.body }); + await updateConfiguration({ ...req.body }); + const config = await getConfig(false); + res.status(200).send({ result: { minWalletBalance: config.minWalletBalance, diff --git a/src/server/routes/configuration/chains/get.ts b/src/server/routes/configuration/chains/get.ts index 9a78d00d0..106faff38 100644 --- a/src/server/routes/configuration/chains/get.ts +++ b/src/server/routes/configuration/chains/get.ts @@ -1,7 +1,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; export const ReplySchema = Type.Object({ result: Type.Union([Type.String(), Type.Null()]), @@ -23,7 +23,7 @@ export async function getChainsConfiguration(fastify: FastifyInstance) { }, }, handler: async (req, res) => { - const config = await getConfiguration(); + const config = await getConfig(); res.status(200).send({ result: config.chainOverrides, }); diff --git a/src/server/routes/configuration/chains/update.ts b/src/server/routes/configuration/chains/update.ts index 0e3e5e311..40db43e10 100644 --- a/src/server/routes/configuration/chains/update.ts +++ b/src/server/routes/configuration/chains/update.ts @@ -1,8 +1,8 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { ReplySchema } from "./get"; const BodySchema = Type.Object({ @@ -63,7 +63,7 @@ export async function updateChainsConfiguration(fastify: FastifyInstance) { chainOverrides: JSON.stringify(req.body.chainOverrides), }); - const config = await getConfiguration(); + const config = await getConfig(false); res.status(200).send({ result: config.chainOverrides, }); diff --git a/src/server/routes/configuration/transactions/get.ts b/src/server/routes/configuration/transactions/get.ts index 710a7396c..bc5550ae9 100644 --- a/src/server/routes/configuration/transactions/get.ts +++ b/src/server/routes/configuration/transactions/get.ts @@ -1,7 +1,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; export const ReplySchema = Type.Object({ result: Type.Object({ @@ -33,7 +33,7 @@ export async function getTransactionConfiguration(fastify: FastifyInstance) { }, }, handler: async (req, res) => { - const config = await getConfiguration(); + const config = await getConfig(); res.status(200).send({ result: { minTxsToProcess: config.minTxsToProcess, diff --git a/src/server/routes/configuration/transactions/update.ts b/src/server/routes/configuration/transactions/update.ts index a1c30761c..2fcc89007 100644 --- a/src/server/routes/configuration/transactions/update.ts +++ b/src/server/routes/configuration/transactions/update.ts @@ -2,6 +2,7 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { ReplySchema } from "./get"; const BodySchema = Type.Partial( @@ -36,7 +37,8 @@ export async function updateTransactionConfiguration(fastify: FastifyInstance) { }, }, handler: async (req, res) => { - const config = await updateConfiguration({ ...req.body }); + await updateConfiguration({ ...req.body }); + const config = await getConfig(false); res.status(200).send({ result: { minTxsToProcess: config.minTxsToProcess, diff --git a/src/server/routes/configuration/wallets/get.ts b/src/server/routes/configuration/wallets/get.ts index 93cc9f1fc..da7763ea4 100644 --- a/src/server/routes/configuration/wallets/get.ts +++ b/src/server/routes/configuration/wallets/get.ts @@ -1,8 +1,8 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; import { WalletType } from "../../../../schema/wallet"; +import { getConfig } from "../../../../utils/cache/getConfig"; export const ReplySchema = Type.Object({ result: Type.Union([ @@ -42,7 +42,7 @@ export async function getWalletsConfiguration(fastify: FastifyInstance) { }, }, handler: async (req, res) => { - const config = await getConfiguration(); + const config = await getConfig(); res.status(200).send({ result: config.walletConfiguration, }); diff --git a/src/server/routes/configuration/wallets/update.ts b/src/server/routes/configuration/wallets/update.ts index 6d6c34d40..33fea9d80 100644 --- a/src/server/routes/configuration/wallets/update.ts +++ b/src/server/routes/configuration/wallets/update.ts @@ -1,9 +1,9 @@ import { Static, Type } from "@sinclair/typebox"; import { FastifyInstance } from "fastify"; import { StatusCodes } from "http-status-codes"; -import { getConfiguration } from "../../../../db/configuration/getConfiguration"; import { updateConfiguration } from "../../../../db/configuration/updateConfiguration"; import { WalletType } from "../../../../schema/wallet"; +import { getConfig } from "../../../../utils/cache/getConfig"; import { ReplySchema } from "./get"; const BodySchema = Type.Union([ @@ -124,7 +124,7 @@ export async function updateWalletsConfiguration(fastify: FastifyInstance) { break; } - const config = await getConfiguration(); + const config = await getConfig(false); res.status(200).send({ result: config.walletConfiguration, }); diff --git a/src/server/utils/chain.ts b/src/server/utils/chain.ts index e5a5354cb..c2adc8f35 100644 --- a/src/server/utils/chain.ts +++ b/src/server/utils/chain.ts @@ -1,6 +1,6 @@ import { Static } from "@sinclair/typebox"; import { allChains, getChainByChainId } from "@thirdweb-dev/chains"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; +import { getConfig } from "../../utils/cache/getConfig"; import { networkResponseSchema } from "../../utils/cache/getSdk"; const ChainNameToChainId = { @@ -20,7 +20,7 @@ export const getChainIdFromChain = async (chain: string): Promise => { chainId = ChainNameToChainId[chain as keyof typeof ChainNameToChainId]; } - const config = await getConfiguration(); + const config = await getConfig(); // If we're passed a valid chain id directly, then use it if (!isNaN(parseInt(chain))) { diff --git a/src/server/utils/wallets/createAwsKmsWallet.ts b/src/server/utils/wallets/createAwsKmsWallet.ts index 2cf595957..fc5fe047b 100644 --- a/src/server/utils/wallets/createAwsKmsWallet.ts +++ b/src/server/utils/wallets/createAwsKmsWallet.ts @@ -1,6 +1,6 @@ import { CreateKeyCommand, KMSClient } from "@aws-sdk/client-kms"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { importAwsKmsWallet } from "./importAwsKmsWallet"; interface CreateAwsKmsWalletParams { @@ -10,7 +10,7 @@ interface CreateAwsKmsWalletParams { export const createAwsKmsWallet = async ({ label, }: CreateAwsKmsWalletParams): Promise => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.awsKms) { throw new Error(`Server was not configured for AWS KMS wallet creation`); } diff --git a/src/server/utils/wallets/createGcpKmsWallet.ts b/src/server/utils/wallets/createGcpKmsWallet.ts index c018690ec..464fa335f 100644 --- a/src/server/utils/wallets/createGcpKmsWallet.ts +++ b/src/server/utils/wallets/createGcpKmsWallet.ts @@ -1,7 +1,7 @@ import { KeyManagementServiceClient } from "@google-cloud/kms"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { createWalletDetails } from "../../../db/wallets/createWalletDetails"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { getGcpKmsWallet } from "./getGcpKmsWallet"; interface CreateGcpKmsWallet { @@ -11,7 +11,7 @@ interface CreateGcpKmsWallet { export const createGcpKmsWallet = async ({ label, }: CreateGcpKmsWallet): Promise => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.gcpKms) { throw new Error(`Server was not configured for GCP KMS wallet creation`); } diff --git a/src/server/utils/wallets/createLocalWallet.ts b/src/server/utils/wallets/createLocalWallet.ts index e838c5038..b6ffd5cd3 100644 --- a/src/server/utils/wallets/createLocalWallet.ts +++ b/src/server/utils/wallets/createLocalWallet.ts @@ -1,6 +1,6 @@ import { LocalWallet } from "@thirdweb-dev/wallets"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { env } from "../../../utils/env"; import { LocalFileStorage } from "../storage/localStorage"; @@ -11,7 +11,7 @@ interface CreateLocalWallet { export const createLocalWallet = async ({ label, }: CreateLocalWallet): Promise => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.local) { throw new Error(`Server was not configured for local wallet creation.`); } diff --git a/src/server/utils/wallets/getAwsKmsWallet.ts b/src/server/utils/wallets/getAwsKmsWallet.ts index e888a1fc8..ae4863b15 100644 --- a/src/server/utils/wallets/getAwsKmsWallet.ts +++ b/src/server/utils/wallets/getAwsKmsWallet.ts @@ -1,6 +1,6 @@ import { AwsKmsWallet } from "@thirdweb-dev/wallets/evm/wallets/aws-kms"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; interface GetAwsKmsWalletParams { awsKmsKeyId: string; @@ -9,7 +9,7 @@ interface GetAwsKmsWalletParams { export const getAwsKmsWallet = async ({ awsKmsKeyId, }: GetAwsKmsWalletParams) => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.awsKms) { throw new Error(`Server was not configured for AWS KMS wallets.`); } diff --git a/src/server/utils/wallets/getGcpKmsWallet.ts b/src/server/utils/wallets/getGcpKmsWallet.ts index 119f84478..78cf3e34c 100644 --- a/src/server/utils/wallets/getGcpKmsWallet.ts +++ b/src/server/utils/wallets/getGcpKmsWallet.ts @@ -1,6 +1,6 @@ import { GcpKmsWallet } from "@thirdweb-dev/wallets/evm/wallets/gcp-kms"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; interface GetGcpKmsWalletParams { gcpKmsKeyId: string; @@ -11,7 +11,7 @@ export const getGcpKmsWallet = async ({ gcpKmsKeyId, gcpKmsKeyVersionId, }: GetGcpKmsWalletParams) => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.gcpKms) { throw new Error(`Server was not configured for GCP KMS.`); } diff --git a/src/server/utils/wallets/getLocalWallet.ts b/src/server/utils/wallets/getLocalWallet.ts index 936729c68..fba7e944b 100644 --- a/src/server/utils/wallets/getLocalWallet.ts +++ b/src/server/utils/wallets/getLocalWallet.ts @@ -1,8 +1,8 @@ import { Static } from "@sinclair/typebox"; import { Chain, getChainByChainId } from "@thirdweb-dev/chains"; import { LocalWallet } from "@thirdweb-dev/wallets"; -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { getWalletDetails } from "../../../db/wallets/getWalletDetails"; +import { getConfig } from "../../../utils/cache/getConfig"; import { networkResponseSchema } from "../../../utils/cache/getSdk"; import { env } from "../../../utils/env"; import { logger } from "../../../utils/logger"; @@ -18,7 +18,7 @@ export const getLocalWallet = async ({ walletAddress, }: GetLocalWalletParams) => { let chain: Chain | undefined = undefined; - const config = await getConfiguration(); + const config = await getConfig(); const CHAIN_OVERRIDES = config.chainOverrides; try { chain = getChainByChainId(chainId); diff --git a/src/server/utils/wallets/importAwsKmsWallet.ts b/src/server/utils/wallets/importAwsKmsWallet.ts index eddb9b36b..104ba98a5 100644 --- a/src/server/utils/wallets/importAwsKmsWallet.ts +++ b/src/server/utils/wallets/importAwsKmsWallet.ts @@ -1,6 +1,6 @@ -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { createWalletDetails } from "../../../db/wallets/createWalletDetails"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { getAwsKmsWallet } from "./getAwsKmsWallet"; interface ImportAwsKmsWalletParams { @@ -14,7 +14,7 @@ export const importAwsKmsWallet = async ({ awsKmsArn, label, }: ImportAwsKmsWalletParams) => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.awsKms) { throw new Error(`Server was not configured for AWS KMS wallet creation`); } diff --git a/src/server/utils/wallets/importGcpKmsWallet.ts b/src/server/utils/wallets/importGcpKmsWallet.ts index 66ee17512..9c94f04ef 100644 --- a/src/server/utils/wallets/importGcpKmsWallet.ts +++ b/src/server/utils/wallets/importGcpKmsWallet.ts @@ -1,6 +1,6 @@ -import { getConfiguration } from "../../../db/configuration/getConfiguration"; import { createWalletDetails } from "../../../db/wallets/createWalletDetails"; import { WalletType } from "../../../schema/wallet"; +import { getConfig } from "../../../utils/cache/getConfig"; import { getGcpKmsWallet } from "./getGcpKmsWallet"; interface ImportGcpKmsWalletParams { @@ -14,7 +14,7 @@ export const importGcpKmsWallet = async ({ gcpKmsKeyVersionId, label, }: ImportGcpKmsWalletParams) => { - const config = await getConfiguration(); + const config = await getConfig(); if (config.walletConfiguration.type !== WalletType.gcpKms) { throw new Error(`Server was not configured for GCP KMS wallet creation`); } diff --git a/src/server/utils/webhook.ts b/src/server/utils/webhook.ts index a916cc7e4..828385a9f 100644 --- a/src/server/utils/webhook.ts +++ b/src/server/utils/webhook.ts @@ -5,7 +5,7 @@ import { WalletBalanceWebhookSchema, WebhooksEventTypes, } from "../../schema/webhooks"; -import { getWebhookConfig } from "../../utils/cache/getWebhook"; +import { getWebhook } from "../../utils/cache/getWebhook"; import { logger } from "../../utils/logger"; import { TransactionStatusEnum } from "../schemas/transaction"; @@ -52,24 +52,34 @@ export const sendWebhookRequest = async ( webhookConfig: SanitizedWebHooksSchema, body: Record, ): Promise => { - const headers = await createWebhookRequestHeaders(webhookConfig, body); - const response = await fetch(webhookConfig?.url, { - method: "POST", - headers: headers, - body: JSON.stringify(body), - }); - - if (!response.ok) { + try { + const headers = await createWebhookRequestHeaders(webhookConfig, body); + const response = await fetch(webhookConfig?.url, { + method: "POST", + headers: headers, + body: JSON.stringify(body), + }); + + if (!response.ok) { + logger({ + service: "server", + level: "error", + message: `[sendWebhook] Webhook request error: ${response.status} ${response.statusText}`, + }); + + return false; + } + + return true; + } catch (error) { logger({ service: "server", level: "error", - message: `[sendWebhook] Webhook request error: ${response.status} ${response.statusText}`, + message: `[sendWebhook] Webhook request error: ${error}`, + error, }); - return false; } - - return true; }; export const sendTxWebhook = async (queueIds: string[]): Promise => { @@ -83,39 +93,27 @@ export const sendTxWebhook = async (queueIds: string[]): Promise => { return; } else { let webhookConfig: SanitizedWebHooksSchema[] | undefined = - await getWebhookConfig(WebhooksEventTypes.ALL_TX); + await getWebhook(WebhooksEventTypes.ALL_TX); if (!webhookConfig) { switch (txData.status) { case TransactionStatusEnum.Queued: - webhookConfig = await getWebhookConfig( - WebhooksEventTypes.QUEUED_TX, - ); + webhookConfig = await getWebhook(WebhooksEventTypes.QUEUED_TX); break; case TransactionStatusEnum.Submitted: - webhookConfig = await getWebhookConfig( - WebhooksEventTypes.SENT_TX, - ); + webhookConfig = await getWebhook(WebhooksEventTypes.SENT_TX); break; case TransactionStatusEnum.Retried: - webhookConfig = await getWebhookConfig( - WebhooksEventTypes.RETRIED_TX, - ); + webhookConfig = await getWebhook(WebhooksEventTypes.RETRIED_TX); break; case TransactionStatusEnum.Mined: - webhookConfig = await getWebhookConfig( - WebhooksEventTypes.MINED_TX, - ); + webhookConfig = await getWebhook(WebhooksEventTypes.MINED_TX); break; case TransactionStatusEnum.Errored: - webhookConfig = await getWebhookConfig( - WebhooksEventTypes.ERRORED_TX, - ); + webhookConfig = await getWebhook(WebhooksEventTypes.ERRORED_TX); break; case TransactionStatusEnum.Cancelled: - webhookConfig = await getWebhookConfig( - WebhooksEventTypes.ERRORED_TX, - ); + webhookConfig = await getWebhook(WebhooksEventTypes.ERRORED_TX); break; } } @@ -160,7 +158,7 @@ export const sendBalanceWebhook = async ( return; } - const webhookConfig = await getWebhookConfig( + const webhookConfig = await getWebhook( WebhooksEventTypes.BACKEND_WALLET_BALANCE, ); diff --git a/src/utils/cache/getConfig.ts b/src/utils/cache/getConfig.ts new file mode 100644 index 000000000..600be127d --- /dev/null +++ b/src/utils/cache/getConfig.ts @@ -0,0 +1,57 @@ +import { Configuration } from "@prisma/client"; +import { getConfiguration } from "../../db/configuration/getConfiguration"; +import { WalletType } from "../../schema/wallet"; + +const cacheKey = "config"; +interface Config + extends Omit< + Configuration, + | "awsAccessKeyId" + | "awsSecretAccessKey" + | "awsRegion" + | "gcpApplicationProjectId" + | "gcpKmsLocationId" + | "gcpKmsKeyRingId" + | "gcpApplicationCredentialEmail" + | "gcpApplicationCredentialPrivateKey" + > { + walletConfiguration: + | { + type: WalletType.local; + } + | { + type: WalletType.awsKms; + awsAccessKeyId: string; + awsSecretAccessKey: string; + awsRegion: string; + } + | { + type: WalletType.gcpKms; + gcpApplicationProjectId: string; + gcpKmsLocationId: string; + gcpKmsKeyRingId: string; + gcpApplicationCredentialEmail: string; + gcpApplicationCredentialPrivateKey: string; + }; +} + +const configCache = new Map(); + +export const getConfig = async (retrieveFromCache = true): Promise => { + if ( + configCache.has(cacheKey) && + configCache.get(cacheKey) && + retrieveFromCache + ) { + const config = configCache.get(cacheKey) as Config; + + if (config.authDomain && config.authWalletEncryptedJson) { + return config; + } + } + + const configData = await getConfiguration(); + + configCache.set(cacheKey, configData); + return configData; +}; diff --git a/src/utils/cache/getSdk.ts b/src/utils/cache/getSdk.ts index db985a6bb..39e05250c 100644 --- a/src/utils/cache/getSdk.ts +++ b/src/utils/cache/getSdk.ts @@ -2,10 +2,10 @@ import { Static, Type } from "@sinclair/typebox"; import { getChainByChainId } from "@thirdweb-dev/chains"; import { NetworkInput, ThirdwebSDK } from "@thirdweb-dev/sdk"; import * as fs from "fs"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; import { PrismaTransaction } from "../../schema/prisma"; import { isValidHttpUrl } from "../../server/utils/validator"; import { JsonSchema, env } from "../env"; +import { getConfig } from "./getConfig"; import { getWallet } from "./getWallet"; const sdkCache = new Map(); @@ -67,7 +67,7 @@ export const getSdk = async ({ : `${chainId}`; let RPC_OVERRIDES: Static[] = []; - const config = await getConfiguration(); + const config = await getConfig(); const CHAIN_OVERRIDES = config.chainOverrides; if (sdkCache.has(cacheKey)) { diff --git a/src/utils/cache/getWebhook.ts b/src/utils/cache/getWebhook.ts index 304ce333b..b00340ae4 100644 --- a/src/utils/cache/getWebhook.ts +++ b/src/utils/cache/getWebhook.ts @@ -4,14 +4,19 @@ import { WebhooksEventTypes, } from "../../schema/webhooks"; -export const webhookCache = new Map(); +const webhookCache = new Map(); -export const getWebhookConfig = async ( +export const getWebhook = async ( eventType: WebhooksEventTypes, + retrieveFromCache = true, ): Promise => { const cacheKey = eventType; - if (webhookCache.has(cacheKey) && webhookCache.get(cacheKey)) { - return webhookCache.get(cacheKey) as SanitizedWebHooksSchema[]; + if (retrieveFromCache) { + if (webhookCache.has(cacheKey) && webhookCache.get(cacheKey)) { + return webhookCache.get(cacheKey) as SanitizedWebHooksSchema[]; + } else { + return undefined; + } } const webhookConfig = await getAllWebhooks(); @@ -23,6 +28,7 @@ export const getWebhookConfig = async ( }); if (eventTypeWebhookDetails.length === 0) { + webhookCache.delete(cacheKey); return undefined; } diff --git a/src/utils/env.ts b/src/utils/env.ts index 57185c38d..58826ae73 100644 --- a/src/utils/env.ts +++ b/src/utils/env.ts @@ -52,9 +52,9 @@ export const env = createEnv({ .default("debug"), LOG_SERVICES: z .string() - .default("server,worker") + .default("server,worker,cache") .transform((s) => - z.array(z.enum(["server", "worker"])).parse(s.split(",")), + z.array(z.enum(["server", "worker", "cache"])).parse(s.split(",")), ), THIRDWEB_API_SECRET_KEY: z.string().min(1), ADMIN_WALLET_ADDRESS: z.string().min(1), diff --git a/src/worker/index.ts b/src/worker/index.ts index ca95f928c..58b4af836 100644 --- a/src/worker/index.ts +++ b/src/worker/index.ts @@ -1,8 +1,16 @@ +import { + newConfigurationListener, + updatedConfigurationListener, +} from "./listeners/configListener"; import { deleteProcessedTx } from "./listeners/deleteProcessedTx"; import { minedTxListener } from "./listeners/minedTxListener"; import { queuedTxListener } from "./listeners/queuedTxListener"; import { retryTxListener } from "./listeners/retryTxListener"; import { updateTxListener } from "./listeners/updateTxListener"; +import { + newWebhooksListener, + updatedWebhooksListener, +} from "./listeners/webhookListener"; const worker = async () => { // Listen for queued transactions to process @@ -19,6 +27,14 @@ const worker = async () => { // Delete Successfully Processed Transactions which are older than 24 hours await deleteProcessedTx(); + + // Listen for new & updated configuration data + await newConfigurationListener(); + await updatedConfigurationListener(); + + // Listen for new & updated webhooks data + await newWebhooksListener(); + await updatedWebhooksListener(); }; worker(); diff --git a/src/worker/listeners/configListener.ts b/src/worker/listeners/configListener.ts new file mode 100644 index 000000000..e4b122fdc --- /dev/null +++ b/src/worker/listeners/configListener.ts @@ -0,0 +1,116 @@ +import { knex } from "../../db/client"; +import { getConfig } from "../../utils/cache/getConfig"; +import { logger } from "../../utils/logger"; +import { minedTxListener } from "./minedTxListener"; +import { queuedTxListener } from "./queuedTxListener"; +import { retryTxListener } from "./retryTxListener"; + +export const newConfigurationListener = async (): Promise => { + logger({ + service: "worker", + level: "info", + message: `Listening for new configuration data`, + }); + + // TODO: This doesn't even need to be a listener + const connection = await knex.client.acquireConnection(); + connection.query(`LISTEN new_configuration_data`); + + // Whenever we receive a new transaction, process it + connection.on( + "notification", + async (msg: { channel: string; payload: string }) => { + // Update Configs Data + await getConfig(false); + }, + ); + + connection.on("end", async () => { + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on end`, + }); + }); + + connection.on("error", async (err: any) => { + logger({ + service: "worker", + level: "error", + message: `Database connection error`, + error: err, + }); + + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on error`, + error: err, + }); + }); +}; + +export const updatedConfigurationListener = async (): Promise => { + logger({ + service: "worker", + level: "info", + message: `Listening for updated configuration data`, + }); + + // TODO: This doesn't even need to be a listener + const connection = await knex.client.acquireConnection(); + connection.query(`LISTEN updated_configuration_data`); + + // Whenever we receive a new transaction, process it + connection.on( + "notification", + async (msg: { channel: string; payload: string }) => { + // Update Configs Data + logger({ + service: "worker", + level: "info", + message: `Updated configuration data`, + }); + await getConfig(false); + await queuedTxListener(); + await minedTxListener(); + await retryTxListener(); + }, + ); + + connection.on("end", async () => { + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on end`, + }); + }); + + connection.on("error", async (err: any) => { + logger({ + service: "worker", + level: "error", + message: `Database connection error`, + error: err, + }); + + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on error`, + error: err, + }); + }); +}; diff --git a/src/worker/listeners/minedTxListener.ts b/src/worker/listeners/minedTxListener.ts index cba83b771..ca32411cb 100644 --- a/src/worker/listeners/minedTxListener.ts +++ b/src/worker/listeners/minedTxListener.ts @@ -1,16 +1,21 @@ import cron from "node-cron"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; +import { getConfig } from "../../utils/cache/getConfig"; import { updateMinedTx } from "../tasks/updateMinedTx"; import { updateMinedUserOps } from "../tasks/updateMinedUserOps"; +let task: cron.ScheduledTask; export const minedTxListener = async () => { - const config = await getConfiguration(); + const config = await getConfig(); if (!config.minedTxListenerCronSchedule) { return; } - cron.schedule(config.minedTxListenerCronSchedule, async () => { + if (task) { + task.stop(); + } + + task = cron.schedule(config.minedTxListenerCronSchedule, async () => { await updateMinedTx(); await updateMinedUserOps(); }); diff --git a/src/worker/listeners/queuedTxListener.ts b/src/worker/listeners/queuedTxListener.ts index a919fc105..fa5a1ea39 100644 --- a/src/worker/listeners/queuedTxListener.ts +++ b/src/worker/listeners/queuedTxListener.ts @@ -1,9 +1,11 @@ import cron from "node-cron"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; +import { getConfig } from "../../utils/cache/getConfig"; import { logger } from "../../utils/logger"; import { processTx } from "../tasks/processTx"; let processTxStarted = false; +let task: cron.ScheduledTask; + export const queuedTxListener = async (): Promise => { logger({ service: "worker", @@ -11,13 +13,16 @@ export const queuedTxListener = async (): Promise => { message: `Listening for queued transactions`, }); - const config = await getConfiguration(); + const config = await getConfig(); if (!config.minedTxListenerCronSchedule) { return; } + if (task) { + task.stop(); + } - cron.schedule(config.minedTxListenerCronSchedule, async () => { + task = cron.schedule(config.minedTxListenerCronSchedule, async () => { if (!processTxStarted) { processTxStarted = true; await processTx(); diff --git a/src/worker/listeners/retryTxListener.ts b/src/worker/listeners/retryTxListener.ts index b58527176..bfc345fa1 100644 --- a/src/worker/listeners/retryTxListener.ts +++ b/src/worker/listeners/retryTxListener.ts @@ -1,15 +1,20 @@ import cron from "node-cron"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; +import { getConfig } from "../../utils/cache/getConfig"; import { retryTx } from "../tasks/retryTx"; +let task: cron.ScheduledTask; export const retryTxListener = async () => { - const config = await getConfiguration(); + const config = await getConfig(); if (!config.retryTxListenerCronSchedule) { return; } - cron.schedule(config.retryTxListenerCronSchedule, async () => { + if (task) { + task.stop(); + } + + task = cron.schedule(config.retryTxListenerCronSchedule, async () => { await retryTx(); }); }; diff --git a/src/worker/listeners/webhookListener.ts b/src/worker/listeners/webhookListener.ts new file mode 100644 index 000000000..857acbb12 --- /dev/null +++ b/src/worker/listeners/webhookListener.ts @@ -0,0 +1,120 @@ +import { knex } from "../../db/client"; +import { WebhooksEventTypes } from "../../schema/webhooks"; +import { getWebhook } from "../../utils/cache/getWebhook"; +import { logger } from "../../utils/logger"; + +export const newWebhooksListener = async (): Promise => { + logger({ + service: "worker", + level: "info", + message: `Listening for new webhooks data`, + }); + + // TODO: This doesn't even need to be a listener + const connection = await knex.client.acquireConnection(); + connection.query(`LISTEN new_webhook_data`); + + // Whenever we receive a new transaction, process it + connection.on( + "notification", + async (msg: { channel: string; payload: string }) => { + logger({ + service: "worker", + level: "info", + message: `Received new webhooks data`, + }); + // Update Webhooks Data + for (const eventType of Object.values(WebhooksEventTypes)) { + await getWebhook(eventType, false); + } + }, + ); + + connection.on("end", async () => { + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on end`, + }); + }); + + connection.on("error", async (err: any) => { + logger({ + service: "worker", + level: "error", + message: `Database connection error`, + error: err, + }); + + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on error`, + error: err, + }); + }); +}; + +export const updatedWebhooksListener = async (): Promise => { + logger({ + service: "worker", + level: "info", + message: `Listening for updated webhooks data`, + }); + + // TODO: This doesn't even need to be a listener + const connection = await knex.client.acquireConnection(); + connection.query(`LISTEN updated_webhook_data`); + + // Whenever we receive a new transaction, process it + connection.on( + "notification", + async (msg: { channel: string; payload: string }) => { + // Update Configs Data + logger({ + service: "worker", + level: "info", + message: `Received updated webhooks data`, + }); + for (const eventType of Object.values(WebhooksEventTypes)) { + await getWebhook(eventType, false); + } + }, + ); + + connection.on("end", async () => { + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on end`, + }); + }); + + connection.on("error", async (err: any) => { + logger({ + service: "worker", + level: "error", + message: `Database connection error`, + error: err, + }); + + await knex.destroy(); + knex.client.releaseConnection(connection); + + logger({ + service: "worker", + level: "info", + message: `Released database connection on error`, + error: err, + }); + }); +}; diff --git a/src/worker/tasks/processTx.ts b/src/worker/tasks/processTx.ts index 4b4284adb..23eaf50d6 100644 --- a/src/worker/tasks/processTx.ts +++ b/src/worker/tasks/processTx.ts @@ -9,7 +9,6 @@ import { ethers } from "ethers"; import { BigNumber } from "ethers/lib/ethers"; import { RpcResponse } from "viem/_types/utils/rpc"; import { prisma } from "../../db/client"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; import { getQueuedTxs } from "../../db/transactions/getQueuedTxs"; import { updateTx } from "../../db/transactions/updateTx"; import { getWalletNonce } from "../../db/wallets/getWalletNonce"; @@ -20,6 +19,7 @@ import { transactionResponseSchema, } from "../../server/schemas/transaction"; import { sendBalanceWebhook, sendTxWebhook } from "../../server/utils/webhook"; +import { getConfig } from "../../utils/cache/getConfig"; import { getSdk } from "../../utils/cache/getSdk"; import { env } from "../../utils/env"; import { logger } from "../../utils/logger"; @@ -63,7 +63,7 @@ export const processTx = async () => { message: `Received ${txs.length} transactions to process`, }); - const config = await getConfiguration(); + const config = await getConfig(); if (txs.length < config.minTxsToProcess) { return; } diff --git a/src/worker/tasks/retryTx.ts b/src/worker/tasks/retryTx.ts index fefdeaea2..c7d4fd1e1 100644 --- a/src/worker/tasks/retryTx.ts +++ b/src/worker/tasks/retryTx.ts @@ -1,10 +1,10 @@ import { getDefaultGasOverrides } from "@thirdweb-dev/sdk"; import { ethers } from "ethers"; import { prisma } from "../../db/client"; -import { getConfiguration } from "../../db/configuration/getConfiguration"; import { getTxToRetry } from "../../db/transactions/getTxToRetry"; import { updateTx } from "../../db/transactions/updateTx"; import { TransactionStatusEnum } from "../../server/schemas/transaction"; +import { getConfig } from "../../utils/cache/getConfig"; import { getSdk } from "../../utils/cache/getSdk"; import { logger } from "../../utils/logger"; @@ -18,7 +18,7 @@ export const retryTx = async () => { return; } - const config = await getConfiguration(); + const config = await getConfig(); const sdk = await getSdk({ chainId: parseInt(tx.chainId!), walletAddress: tx.fromAddress!,