Skip to content

Commit

Permalink
Added Webhook Config to DB & updated logic to cache data & reset when…
Browse files Browse the repository at this point in the history
… config updated (#233)
  • Loading branch information
farhanW3 authored Oct 17, 2023
1 parent 4dcaf67 commit 66052b3
Show file tree
Hide file tree
Showing 11 changed files with 140 additions and 33 deletions.
38 changes: 38 additions & 0 deletions server/api/configuration/webhooks/get.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import { Static, Type } from "@sinclair/typebox";
import { FastifyInstance } from "fastify";
import { StatusCodes } from "http-status-codes";
import { getConfiguration } from "../../../../src/db/configuration/getConfiguration";

export const ReplySchema = Type.Object({
result: Type.Object({
webhookUrl: Type.String(),
webhookAuthBearerToken: Type.String(),
}),
});

export async function getWebhookConfiguration(fastify: FastifyInstance) {
fastify.route<{
Reply: Static<typeof ReplySchema>;
}>({
method: "GET",
url: "/configuration/webhook",
schema: {
summary: "Get webhook configuration",
description: "Get the engine configuration for webhook",
tags: ["Configuration"],
operationId: "getWebhookConfiguration",
response: {
[StatusCodes.OK]: ReplySchema,
},
},
handler: async (req, res) => {
const config = await getConfiguration();
res.status(200).send({
result: {
webhookAuthBearerToken: config.webhookAuthBearerToken || "",
webhookUrl: config.webhookUrl || "",
},
});
},
});
}
40 changes: 40 additions & 0 deletions server/api/configuration/webhooks/update.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { Static, Type } from "@sinclair/typebox";
import { FastifyInstance } from "fastify";
import { StatusCodes } from "http-status-codes";
import { updateConfiguration } from "../../../../src/db/configuration/updateConfiguration";
import { ReplySchema } from "./get";

const BodySchema = Type.Partial(
Type.Object({
webhookUrl: Type.String(),
webhookAuthBearerToken: Type.String(),
}),
);

export async function updateWebhookConfiguration(fastify: FastifyInstance) {
fastify.route<{
Body: Static<typeof BodySchema>;
}>({
method: "POST",
url: "/configuration/webhook",
schema: {
summary: "Update webhook configuration",
description: "Update the engine configuration for webhook",
tags: ["Configuration"],
operationId: "updateWebhookConfiguration",
body: BodySchema,
response: {
[StatusCodes.OK]: ReplySchema,
},
},
handler: async (req, res) => {
const config = await updateConfiguration({ ...req.body });
res.status(200).send({
result: {
webhookUrl: config.webhookUrl,
webhookAuthBearerToken: config.webhookAuthBearerToken,
},
});
},
});
}
8 changes: 8 additions & 0 deletions server/api/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,18 @@ import { getBalance } from "./backend-wallet/getBalance";
import { importWallet } from "./backend-wallet/import";
import { sendTransaction } from "./backend-wallet/send";
import { transfer } from "./backend-wallet/transfer";

// Configuration
import { getChainsConfiguration } from "./configuration/chains/get";
import { updateChainsConfiguration } from "./configuration/chains/update";
import { getTransactionConfiguration } from "./configuration/transactions/get";
import { updateTransactionConfiguration } from "./configuration/transactions/update";
import { getWalletsConfiguration } from "./configuration/wallets/get";
import { updateWalletsConfiguration } from "./configuration/wallets/update";
import { getWebhookConfiguration } from "./configuration/webhooks/get";
import { updateWebhookConfiguration } from "./configuration/webhooks/update";

// Accounts
import { accountRoutes } from "./contract/extensions/account";
import { accountFactoryRoutes } from "./contract/extensions/accountFactory";

Expand All @@ -74,6 +80,8 @@ export const apiRoutes = async (fastify: FastifyInstance) => {
await fastify.register(updateChainsConfiguration);
await fastify.register(getTransactionConfiguration);
await fastify.register(updateTransactionConfiguration);
await fastify.register(getWebhookConfiguration);
await fastify.register(updateWebhookConfiguration);

// Chains
await fastify.register(getChainData);
Expand Down
9 changes: 1 addition & 8 deletions server/controller/tx-update-listener.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { knex } from "../../src/db/client";
import { getTxById } from "../../src/db/transactions/getTxById";
import { env } from "../../src/utils/env";
import { logger } from "../../src/utils/logger";
import {
formatSocketMessage,
Expand All @@ -23,13 +22,7 @@ export const startTxUpdatesNotificationListener = async (): Promise<void> => {
const parsedPayload = JSON.parse(msg.payload);

// Send webhook
if (env.WEBHOOK_URL.length > 0) {
await sendWebhook(parsedPayload);
} else {
logger.server.debug(
`Webhooks are disabled or no URL is provided. Skipping webhook update`,
);
}
await sendWebhook(parsedPayload);

// Send websocket message
const index = subscriptionsData.findIndex(
Expand Down
15 changes: 11 additions & 4 deletions server/utilities/webhook.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
import { getTxById } from "../../src/db/transactions/getTxById";
import { env } from "../../src/utils/env";
import { logger } from "../../src/utils/logger";
import { getWebhookConfig } from "../utils/cache/getWebhookConfig";

export const sendWebhook = async (data: any): Promise<void> => {
try {
const webhookConfig = await getWebhookConfig();

if (!webhookConfig.webhookUrl) {
logger.server.debug("No WebhookURL set, skipping webhook send");
return;
}

const txData = await getTxById({ queueId: data.id });
const headers: {
Accept: string;
Expand All @@ -14,13 +21,13 @@ export const sendWebhook = async (data: any): Promise<void> => {
"Content-Type": "application/json",
};

if (process.env.WEBHOOK_AUTH_BEARER_TOKEN) {
if (webhookConfig.webhookAuthBearerToken) {
headers[
"Authorization"
] = `Bearer ${process.env.WEBHOOK_AUTH_BEARER_TOKEN}`;
] = `Bearer ${webhookConfig.webhookAuthBearerToken}`;
}

const response = await fetch(env.WEBHOOK_URL, {
const response = await fetch(webhookConfig.webhookUrl, {
method: "POST",
headers,
body: JSON.stringify(txData),
Expand Down
29 changes: 29 additions & 0 deletions server/utils/cache/getWebhookConfig.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import { getConfiguration } from "../../../src/db/configuration/getConfiguration";

interface WebhookConfig {
webhookUrl: string;
webhookAuthBearerToken: string | null;
}

export const webhookCache = new Map<string, WebhookConfig>();

export const getWebhookConfig = async (): Promise<WebhookConfig> => {
const cacheKey = `webhookConfig`;
if (webhookCache.has(cacheKey)) {
return webhookCache.get(cacheKey)! as WebhookConfig;
}

const config = await getConfiguration();

if (config.webhookAuthBearerToken || config.webhookUrl) {
webhookCache.set(cacheKey, {
webhookUrl: config.webhookUrl!,
webhookAuthBearerToken: config.webhookAuthBearerToken,
});
}

return {
webhookUrl: config.webhookUrl!,
webhookAuthBearerToken: config.webhookAuthBearerToken,
};
};
5 changes: 5 additions & 0 deletions src/db/configuration/updateConfiguration.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Prisma } from "@prisma/client";
import { webhookCache } from "../../../server/utils/cache/getWebhookConfig";
import { encrypt } from "../../utils/cypto";
import { prisma } from "../client";
import { getConfiguration } from "./getConfiguration";
Expand All @@ -7,6 +8,10 @@ export const updateConfiguration = async (
data: Prisma.ConfigurationUpdateArgs["data"],
) => {
const config = await getConfiguration();

// Clearing webhook cache on update
webhookCache.clear();

return prisma.configuration.update({
where: {
id: config.id,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-- AlterTable
ALTER TABLE "configuration" ADD COLUMN "webhookAuthBearerToken" TEXT,
ADD COLUMN "webhookUrl" TEXT;
3 changes: 3 additions & 0 deletions src/prisma/schema.prisma
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ model Configuration {
gcpKmsKeyRingId String? @map("gcpKmsKeyRingId")
gcpApplicationCredentialEmail String? @map("gcpApplicationCredentialEmail")
gcpApplicationCredentialPrivateKey String? @map("gcpApplicationCredentialPrivateKey")
// Webhook
webhookUrl String? @map("webhookUrl")
webhookAuthBearerToken String? @map("webhookAuthBearerToken")
@@map("configuration")
}
Expand Down
12 changes: 0 additions & 12 deletions src/utils/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ export const env = createEnv({
PORT: z.coerce.number().default(3005),
HOST: z.string().default("0.0.0.0"),
ACCESS_CONTROL_ALLOW_ORIGIN: z.string().default("*"),
WEBHOOK_URL: z
.string()
.default("")
.transform((url) => {
if (url.length > 0) {
return url;
}
return "";
}),
WEBHOOK_AUTH_BEARER_TOKEN: z.string().default(""),
},
clientPrefix: "NEVER_USED",
client: {},
Expand All @@ -77,8 +67,6 @@ export const env = createEnv({
HOST: process.env.HOST,
OPENAPI_BASE_ORIGIN: process.env.OPENAPI_BASE_ORIGIN,
ACCESS_CONTROL_ALLOW_ORIGIN: process.env.ACCESS_CONTROL_ALLOW_ORIGIN,
WEBHOOK_URL: process.env.WEBHOOK_URL,
WEBHOOK_AUTH_BEARER_TOKEN: process.env.WEBHOOK_AUTH_BEARER_TOKEN,
},
onValidationError: (error: ZodError) => {
console.error(
Expand Down
11 changes: 2 additions & 9 deletions src/worker/listeners/queuedTxListener.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import PQueue from "p-queue";
import { sendWebhook } from "../../../server/utilities/webhook";
import { knex } from "../../db/client";
import { env } from "../../utils/env";
import { logger } from "../../utils/logger";
import { processTx } from "../tasks/processTx";

Expand Down Expand Up @@ -31,14 +30,8 @@ export const queuedTxListener = async (): Promise<void> => {
connection.on(
"notification",
async (msg: { channel: string; payload: string }) => {
if (env.WEBHOOK_URL.length > 0) {
const parsedPayload = JSON.parse(msg.payload);
await sendWebhook(parsedPayload);
} else {
logger.server.debug(
`Webhooks are disabled or no URL is provided. Skipping webhook update`,
);
}
const parsedPayload = JSON.parse(msg.payload);
await sendWebhook(parsedPayload);
queue.add(processTx);
},
);
Expand Down

0 comments on commit 66052b3

Please sign in to comment.