Skip to content
This repository has been archived by the owner on Mar 24, 2023. It is now read-only.

Commit

Permalink
Merge pull request #597 from godwokenrises/1.10-batch-limit
Browse files Browse the repository at this point in the history
feat(1.10-rc): check batch request limit and RPC method rate limit
  • Loading branch information
RetricSu authored Jan 9, 2023
2 parents 96407f9 + 8098f8c commit 72dcbd0
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 22 deletions.
2 changes: 2 additions & 0 deletions .github/workflows/godwoken-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@ jobs:
MANUAL_BUILD_WEB3_INDEXER=true
WEB3_GIT_URL=https://github.com/${{ github.repository }}
WEB3_GIT_CHECKOUT=${{ github.ref }}
GODWOKEN_KICKER_REPO=godwokenrises/godwoken-kicker
GODWOKEN_KICKER_REF=1ba9ec08bf940e7222931ccc2940159dc877d1b4
2 changes: 1 addition & 1 deletion .github/workflows/unit-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
- uses: actions/checkout@v3
with:
repository: godwokenrises/godwoken-kicker
ref: 'develop'
ref: '1ba9ec08bf940e7222931ccc2940159dc877d1b4'
- name: Kicker init
run: ./kicker init
- name: Kicker start
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ rate limit config
```bash
$ cat > ./packages/api-server/rate-limit-config.json <<EOF
{
"batch_limit": 1000,
"expired_time_milsec": 60000,
"methods": {
"poly_executeRawL2Transaction": 30,
Expand Down
20 changes: 16 additions & 4 deletions packages/api-server/src/cache/guard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,10 @@ const configPath = path.resolve(__dirname, "../../rate-limit-config.json");

export const EXPIRED_TIME_MILSECS = 1 * 60 * 1000; // milsec, default 1 minutes
export const MAX_REQUEST_COUNT = 30;
export const BATCH_LIMIT = 100000; // 100_000 RPCs in single batch req

export interface RateLimitConfig {
batch_limit: number;
expired_time_milsec: number;
methods: RpcMethodLimit;
}
Expand All @@ -39,6 +41,7 @@ export class AccessGuard {
public store: Store;
public rpcMethods: RpcMethodLimit;
public expiredTimeMilsecs: number;
public batchLimit: number;

constructor(
enableExpired = true,
Expand All @@ -51,6 +54,7 @@ export class AccessGuard {
this.store = store || new Store(enableExpired, expiredTimeMilsecs);
this.rpcMethods = config.methods;
this.expiredTimeMilsecs = expiredTimeMilsecs || CACHE_EXPIRED_TIME_MILSECS;
this.batchLimit = config.batch_limit || BATCH_LIMIT;
}

async setMaxReqLimit(rpcMethod: string, maxReqCount: number) {
Expand All @@ -75,11 +79,15 @@ export class AccessGuard {
}
}

async updateCount(rpcMethod: string, reqId: string) {
async updateCount(rpcMethod: string, reqId: string, offset: number = 1) {
const isExist = await this.isExist(rpcMethod, reqId);
if (isExist === true) {
const id = getId(rpcMethod, reqId);
await this.store.incr(id);
if (offset > 1) {
await this.store.incrBy(id, offset);
} else {
await this.store.incr(id);
}
}
}

Expand All @@ -90,13 +98,17 @@ export class AccessGuard {
return true;
}

async isOverRate(rpcMethod: string, reqId: string): Promise<boolean> {
async isOverRate(
rpcMethod: string,
reqId: string,
offset: number = 1
): Promise<boolean> {
const id = getId(rpcMethod, reqId);
const data = await this.store.get(id);
if (data == null) return false;
if (this.rpcMethods[rpcMethod] == null) return false;

const count = +data;
const count = +data + offset;
const maxNumber = this.rpcMethods[rpcMethod];
if (count > maxNumber) {
return true;
Expand Down
11 changes: 11 additions & 0 deletions packages/api-server/src/cache/store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,17 @@ export class Store {
return await this.client.incr(key);
}

async incrBy(key: string, offset: number) {
const data = await this.client.get(key);
if (data == null) {
throw new Error("can not update before key exits");
}
if (isNaN(data as any)) {
throw new Error("can not update with NaN value");
}
return await this.client.incrBy(key, offset);
}

async ttl(key: string) {
return await this.client.ttl(key);
}
Expand Down
117 changes: 109 additions & 8 deletions packages/api-server/src/rate-limit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,47 @@ import { JSONRPCError } from "jayson";

export const accessGuard = new AccessGuard();

export async function wsApplyRateLimitByIp(req: Request, method: string) {
export async function wsApplyBatchRateLimitByIp(
req: Request,
objs: any[]
): Promise<JSONRPCError[] | undefined> {
const ip = getIp(req);
const methods = Object.keys(accessGuard.rpcMethods);
for (const targetMethod of methods) {
const count = calcMethodCount(objs, targetMethod);
if (count > 0 && ip != null) {
const isExist = await accessGuard.isExist(targetMethod, ip);
if (!isExist) {
await accessGuard.add(targetMethod, ip);
}

const isOverRate = await accessGuard.isOverRate(targetMethod, ip, count);
if (isOverRate) {
const remainSecs = await accessGuard.getKeyTTL(targetMethod, ip);
const message = `Too Many Requests, IP: ${ip}, please wait ${remainSecs}s and retry. RPC method: ${targetMethod}.`;
const error: JSONRPCError = {
code: LIMIT_EXCEEDED,
message: message,
};

logger.debug(
`Rate Limit Exceed, ip: ${ip}, method: ${targetMethod}, ttl: ${remainSecs}s`
);

return new Array(objs.length).fill(error);
} else {
await accessGuard.updateCount(targetMethod, ip, count);
}
}

return undefined;
}
}

export async function wsApplyRateLimitByIp(
req: Request,
method: string
): Promise<JSONRPCError | undefined> {
const ip = getIp(req);
const methods = Object.keys(accessGuard.rpcMethods);
if (methods.includes(method) && ip != null) {
Expand All @@ -23,6 +63,11 @@ export async function applyRateLimitByIp(
res: Response,
next: NextFunction
) {
// check batch limit
if (batchLimit(req, res)) {
return;
}

const methods = Object.keys(accessGuard.rpcMethods);
if (methods.length === 0) {
return next();
Expand All @@ -45,20 +90,69 @@ export async function applyRateLimitByIp(
}
}

export function batchLimit(req: Request, res: Response) {
let isBan = false;
if (isBatchLimit(req.body)) {
isBan = true;
// if reach batch limit, we reject the whole req with error
const message = `Too Many Batch Requests ${req.body.length}, limit: ${accessGuard.batchLimit}.`;
const error = {
code: LIMIT_EXCEEDED,
message: message,
};

logger.debug(
`Batch Limit Exceed, ${req.body.length}, limit: ${accessGuard.batchLimit}`
);

const content = req.body.map((b: any) => {
return {
jsonrpc: "2.0",
id: b.id,
error: error,
};
});

const httpRateLimitCode = 429;
res.status(httpRateLimitCode).send(content);
}
return isBan;
}

export function wsBatchLimit(body: any): JSONRPCError[] | undefined {
if (isBatchLimit(body)) {
// if reach batch limit, we reject the whole req with error
const message = `Too Many Batch Requests ${body.length}, limit: ${accessGuard.batchLimit}.`;
const error: JSONRPCError = {
code: LIMIT_EXCEEDED,
message: message,
};

logger.debug(
`WS Batch Limit Exceed, ${body.length}, limit: ${accessGuard.batchLimit}`
);

return new Array(body.length).fill(error);
}

return undefined;
}

export async function rateLimit(
req: Request,
res: Response,
rpcMethod: string,
reqId: string | undefined
) {
let isBan = false;
if (hasMethod(req.body, rpcMethod) && reqId != null) {
const count = calcMethodCount(req.body, rpcMethod);
if (count > 0 && reqId != null) {
const isExist = await accessGuard.isExist(rpcMethod, reqId);
if (!isExist) {
await accessGuard.add(rpcMethod, reqId);
}

const isOverRate = await accessGuard.isOverRate(rpcMethod, reqId);
const isOverRate = await accessGuard.isOverRate(rpcMethod, reqId, count);
if (isOverRate) {
isBan = true;

Expand Down Expand Up @@ -94,7 +188,7 @@ export async function rateLimit(
};
res.status(httpRateLimitCode).header(httpRateLimitHeader).send(content);
} else {
await accessGuard.updateCount(rpcMethod, reqId);
await accessGuard.updateCount(rpcMethod, reqId, count);
}
}
return isBan;
Expand All @@ -120,7 +214,7 @@ export async function wsRateLimit(
};

logger.debug(
`Rate Limit Exceed, ip: ${reqId}, method: ${rpcMethod}, ttl: ${remainSecs}s`
`WS Rate Limit Exceed, ip: ${reqId}, method: ${rpcMethod}, ttl: ${remainSecs}s`
);
return { error, remainSecs };
} else {
Expand All @@ -129,12 +223,19 @@ export async function wsRateLimit(
return undefined;
}

export function hasMethod(body: any, name: string) {
export function isBatchLimit(body: any) {
if (Array.isArray(body)) {
return body.length >= accessGuard.batchLimit;
}
return false;
}

export function calcMethodCount(body: any, targetMethod: string): number {
if (Array.isArray(body)) {
return body.map((b) => b.method).includes(name);
return body.filter((b) => b.method === targetMethod).length;
}

return body.method === name;
return body.method === targetMethod ? 1 : 0;
}

export function getIp(req: Request) {
Expand Down
35 changes: 26 additions & 9 deletions packages/api-server/src/ws/methods.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@ import { Log, LogQueryOption, toApiLog } from "../db/types";
import { filterLogsByAddress, filterLogsByTopics, Query } from "../db";
import { Store } from "../cache/store";
import { CACHE_EXPIRED_TIME_MILSECS } from "../cache/constant";
import { wsApplyRateLimitByIp } from "../rate-limit";
import {
wsApplyBatchRateLimitByIp,
wsApplyRateLimitByIp,
wsBatchLimit,
} from "../rate-limit";
import { gwTxHashToEthTxHash } from "../cache/tx-hash";
import { isInstantFinalityHackMode } from "../util";

Expand Down Expand Up @@ -73,16 +77,29 @@ export function wrapper(ws: any, req: any) {
const callback = (err: any, result: any) => {
return { err, result };
};

// check batch limit
const errs = wsBatchLimit(objs);
if (errs != null) {
return cb(
errs.map((err) => {
return { err };
})
);
}

// check batch rate limit
const batchErrs = await wsApplyBatchRateLimitByIp(req, objs);
if (batchErrs != null) {
return cb(
batchErrs.map((err) => {
return { err };
})
);
}

const info = await Promise.all(
objs.map(async (obj) => {
// check rate limit
const err = await wsApplyRateLimitByIp(req, obj.method);
if (err != null) {
return {
err,
};
}

if (obj.method === "eth_subscribe") {
const r = ethSubscribe(obj.params, callback);
return r;
Expand Down

0 comments on commit 72dcbd0

Please sign in to comment.