diff --git a/.github/workflows/godwoken-tests.yml b/.github/workflows/godwoken-tests.yml index 1ed3cfea..4e90ca29 100644 --- a/.github/workflows/godwoken-tests.yml +++ b/.github/workflows/godwoken-tests.yml @@ -17,3 +17,5 @@ jobs: MANUAL_BUILD_WEB3_INDEXER=true WEB3_GIT_URL=https://github.com/${{ github.repository }} WEB3_GIT_CHECKOUT=${{ github.ref }} + GODWOKEN_KICKER_REPO=godwokenrises/godwoken-kicker + GODWOKEN_KICKER_REF=1ba9ec08bf940e7222931ccc2940159dc877d1b4 diff --git a/.github/workflows/unit-tests.yml b/.github/workflows/unit-tests.yml index b250e430..c161e4f5 100644 --- a/.github/workflows/unit-tests.yml +++ b/.github/workflows/unit-tests.yml @@ -24,7 +24,7 @@ jobs: - uses: actions/checkout@v3 with: repository: godwokenrises/godwoken-kicker - ref: 'develop' + ref: '1ba9ec08bf940e7222931ccc2940159dc877d1b4' - name: Kicker init run: ./kicker init - name: Kicker start diff --git a/README.md b/README.md index 538ee735..0289fc2e 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ rate limit config ```bash $ cat > ./packages/api-server/rate-limit-config.json < 1) { + await this.store.incrBy(id, offset); + } else { + await this.store.incr(id); + } } } @@ -90,13 +98,17 @@ export class AccessGuard { return true; } - async isOverRate(rpcMethod: string, reqId: string): Promise { + async isOverRate( + rpcMethod: string, + reqId: string, + offset: number = 1 + ): Promise { const id = getId(rpcMethod, reqId); const data = await this.store.get(id); if (data == null) return false; if (this.rpcMethods[rpcMethod] == null) return false; - const count = +data; + const count = +data + offset; const maxNumber = this.rpcMethods[rpcMethod]; if (count > maxNumber) { return true; diff --git a/packages/api-server/src/cache/store.ts b/packages/api-server/src/cache/store.ts index 72577fec..ff425362 100644 --- a/packages/api-server/src/cache/store.ts +++ b/packages/api-server/src/cache/store.ts @@ -55,6 +55,17 @@ export class Store { return await this.client.incr(key); } + async incrBy(key: string, offset: number) { + const data = await this.client.get(key); + if (data == null) { + throw new Error("can not update before key exits"); + } + if (isNaN(data as any)) { + throw new Error("can not update with NaN value"); + } + return await this.client.incrBy(key, offset); + } + async ttl(key: string) { return await this.client.ttl(key); } diff --git a/packages/api-server/src/rate-limit.ts b/packages/api-server/src/rate-limit.ts index 5c9b352e..14c2518a 100644 --- a/packages/api-server/src/rate-limit.ts +++ b/packages/api-server/src/rate-limit.ts @@ -6,7 +6,47 @@ import { JSONRPCError } from "jayson"; export const accessGuard = new AccessGuard(); -export async function wsApplyRateLimitByIp(req: Request, method: string) { +export async function wsApplyBatchRateLimitByIp( + req: Request, + objs: any[] +): Promise { + const ip = getIp(req); + const methods = Object.keys(accessGuard.rpcMethods); + for (const targetMethod of methods) { + const count = calcMethodCount(objs, targetMethod); + if (count > 0 && ip != null) { + const isExist = await accessGuard.isExist(targetMethod, ip); + if (!isExist) { + await accessGuard.add(targetMethod, ip); + } + + const isOverRate = await accessGuard.isOverRate(targetMethod, ip, count); + if (isOverRate) { + const remainSecs = await accessGuard.getKeyTTL(targetMethod, ip); + const message = `Too Many Requests, IP: ${ip}, please wait ${remainSecs}s and retry. RPC method: ${targetMethod}.`; + const error: JSONRPCError = { + code: LIMIT_EXCEEDED, + message: message, + }; + + logger.debug( + `Rate Limit Exceed, ip: ${ip}, method: ${targetMethod}, ttl: ${remainSecs}s` + ); + + return new Array(objs.length).fill(error); + } else { + await accessGuard.updateCount(targetMethod, ip, count); + } + } + + return undefined; + } +} + +export async function wsApplyRateLimitByIp( + req: Request, + method: string +): Promise { const ip = getIp(req); const methods = Object.keys(accessGuard.rpcMethods); if (methods.includes(method) && ip != null) { @@ -23,6 +63,11 @@ export async function applyRateLimitByIp( res: Response, next: NextFunction ) { + // check batch limit + if (batchLimit(req, res)) { + return; + } + const methods = Object.keys(accessGuard.rpcMethods); if (methods.length === 0) { return next(); @@ -45,6 +90,54 @@ export async function applyRateLimitByIp( } } +export function batchLimit(req: Request, res: Response) { + let isBan = false; + if (isBatchLimit(req.body)) { + isBan = true; + // if reach batch limit, we reject the whole req with error + const message = `Too Many Batch Requests ${req.body.length}, limit: ${accessGuard.batchLimit}.`; + const error = { + code: LIMIT_EXCEEDED, + message: message, + }; + + logger.debug( + `Batch Limit Exceed, ${req.body.length}, limit: ${accessGuard.batchLimit}` + ); + + const content = req.body.map((b: any) => { + return { + jsonrpc: "2.0", + id: b.id, + error: error, + }; + }); + + const httpRateLimitCode = 429; + res.status(httpRateLimitCode).send(content); + } + return isBan; +} + +export function wsBatchLimit(body: any): JSONRPCError[] | undefined { + if (isBatchLimit(body)) { + // if reach batch limit, we reject the whole req with error + const message = `Too Many Batch Requests ${body.length}, limit: ${accessGuard.batchLimit}.`; + const error: JSONRPCError = { + code: LIMIT_EXCEEDED, + message: message, + }; + + logger.debug( + `WS Batch Limit Exceed, ${body.length}, limit: ${accessGuard.batchLimit}` + ); + + return new Array(body.length).fill(error); + } + + return undefined; +} + export async function rateLimit( req: Request, res: Response, @@ -52,13 +145,14 @@ export async function rateLimit( reqId: string | undefined ) { let isBan = false; - if (hasMethod(req.body, rpcMethod) && reqId != null) { + const count = calcMethodCount(req.body, rpcMethod); + if (count > 0 && reqId != null) { const isExist = await accessGuard.isExist(rpcMethod, reqId); if (!isExist) { await accessGuard.add(rpcMethod, reqId); } - const isOverRate = await accessGuard.isOverRate(rpcMethod, reqId); + const isOverRate = await accessGuard.isOverRate(rpcMethod, reqId, count); if (isOverRate) { isBan = true; @@ -94,7 +188,7 @@ export async function rateLimit( }; res.status(httpRateLimitCode).header(httpRateLimitHeader).send(content); } else { - await accessGuard.updateCount(rpcMethod, reqId); + await accessGuard.updateCount(rpcMethod, reqId, count); } } return isBan; @@ -120,7 +214,7 @@ export async function wsRateLimit( }; logger.debug( - `Rate Limit Exceed, ip: ${reqId}, method: ${rpcMethod}, ttl: ${remainSecs}s` + `WS Rate Limit Exceed, ip: ${reqId}, method: ${rpcMethod}, ttl: ${remainSecs}s` ); return { error, remainSecs }; } else { @@ -129,12 +223,19 @@ export async function wsRateLimit( return undefined; } -export function hasMethod(body: any, name: string) { +export function isBatchLimit(body: any) { + if (Array.isArray(body)) { + return body.length >= accessGuard.batchLimit; + } + return false; +} + +export function calcMethodCount(body: any, targetMethod: string): number { if (Array.isArray(body)) { - return body.map((b) => b.method).includes(name); + return body.filter((b) => b.method === targetMethod).length; } - return body.method === name; + return body.method === targetMethod ? 1 : 0; } export function getIp(req: Request) { diff --git a/packages/api-server/src/ws/methods.ts b/packages/api-server/src/ws/methods.ts index b491f8a4..f3ccbbf2 100644 --- a/packages/api-server/src/ws/methods.ts +++ b/packages/api-server/src/ws/methods.ts @@ -12,7 +12,11 @@ import { Log, LogQueryOption, toApiLog } from "../db/types"; import { filterLogsByAddress, filterLogsByTopics, Query } from "../db"; import { Store } from "../cache/store"; import { CACHE_EXPIRED_TIME_MILSECS } from "../cache/constant"; -import { wsApplyRateLimitByIp } from "../rate-limit"; +import { + wsApplyBatchRateLimitByIp, + wsApplyRateLimitByIp, + wsBatchLimit, +} from "../rate-limit"; import { gwTxHashToEthTxHash } from "../cache/tx-hash"; import { isInstantFinalityHackMode } from "../util"; @@ -73,16 +77,29 @@ export function wrapper(ws: any, req: any) { const callback = (err: any, result: any) => { return { err, result }; }; + + // check batch limit + const errs = wsBatchLimit(objs); + if (errs != null) { + return cb( + errs.map((err) => { + return { err }; + }) + ); + } + + // check batch rate limit + const batchErrs = await wsApplyBatchRateLimitByIp(req, objs); + if (batchErrs != null) { + return cb( + batchErrs.map((err) => { + return { err }; + }) + ); + } + const info = await Promise.all( objs.map(async (obj) => { - // check rate limit - const err = await wsApplyRateLimitByIp(req, obj.method); - if (err != null) { - return { - err, - }; - } - if (obj.method === "eth_subscribe") { const r = ethSubscribe(obj.params, callback); return r;