Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix/multi endpint delay #2582

Merged
merged 9 commits into from
Nov 12, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions packages/node-core/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Fixed
- When configuring multiple endpoints, poor network conditions may lead to block crawling delays. (#2572)

## [14.1.7] - 2024-10-30
### Changed
- Bump `@subql/common` dependency
Expand Down
28 changes: 27 additions & 1 deletion packages/node-core/src/indexer/connectionPool.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// SPDX-License-Identifier: GPL-3.0

import {delay} from '@subql/common';
import {ApiErrorType, ConnectionPoolStateManager, IApiConnectionSpecific, NodeConfig} from '..';
import {ApiErrorType, ConnectionPoolStateManager, getLogger, IApiConnectionSpecific, NodeConfig} from '..';
import {ConnectionPoolService} from './connectionPool.service';

async function waitFor(conditionFn: () => boolean, timeout = 50000, interval = 100): Promise<void> {
Expand Down Expand Up @@ -127,4 +127,30 @@ describe('ConnectionPoolService', () => {
expect(handleApiDisconnectsSpy).toHaveBeenCalledTimes(1);
}, 15000);
});

describe('Rate limit endpoint delay 20s', () => {
it('call delay', async () => {
const logger = getLogger('connection-pool');
const consoleSpy = jest.spyOn(logger, 'info');

await connectionPoolService.addToConnections(mockApiConnection, TEST_URL);
await connectionPoolService.addToConnections(mockApiConnection, `${TEST_URL}/2`);
await connectionPoolService.handleApiError(TEST_URL, {
name: 'timeout',
errorType: ApiErrorType.Timeout,
message: 'timeout error',
});
await connectionPoolService.handleApiError(`${TEST_URL}/2`, {
name: 'DefaultError',
errorType: ApiErrorType.Default,
message: 'Default error',
});
await (connectionPoolService as any).flushResultCache();

await connectionPoolService.api.fetchBlocks([34365]);

expect(consoleSpy).toHaveBeenCalledWith('throtling on ratelimited endpoint 20s');
consoleSpy.mockRestore();
}, 30000);
});
});
6 changes: 3 additions & 3 deletions packages/node-core/src/indexer/connectionPool.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,9 +104,9 @@ export class ConnectionPoolService<T extends IApiConnectionSpecific<any, any, an
try {
// Check if the endpoint is rate-limited
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still necessary or there will be no throttling

if (await this.poolStateManager.getFieldValue(endpoint, 'rateLimited')) {
logger.info('throtling on ratelimited endpoint');
const backoffDelay = await this.poolStateManager.getFieldValue(endpoint, 'backoffDelay');
await delay(backoffDelay / 1000);
const rateLimitDelay = await this.poolStateManager.getFieldValue(endpoint, 'rateLimitDelay');
logger.info(`throtling on ratelimited endpoint ${rateLimitDelay / 1000}s`);
await delay(rateLimitDelay / 1000);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

backoffDelay is now no longer used anywhere

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The backoffDelay parameter is mainly used when setting setRecoverTimeout, and there will be an interval function that displays the current endpoint status (function logEndpointStatus).

}

const start = Date.now();
Expand Down
87 changes: 40 additions & 47 deletions packages/node-core/src/indexer/connectionPoolState.manager.spec.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright 2020-2024 SubQuery Pte Ltd authors & contributors
// SPDX-License-Identifier: GPL-3.0

import {ConnectionPoolStateManager} from './connectionPoolState.manager';
import {ApiErrorType} from '../api.connection.error';
import {ConnectionPoolItem, ConnectionPoolStateManager} from './connectionPoolState.manager';

describe('ConnectionPoolStateManager', function () {
let connectionPoolStateManager: ConnectionPoolStateManager<any>;
Expand All @@ -12,62 +13,54 @@ describe('ConnectionPoolStateManager', function () {
connectionPoolStateManager = new ConnectionPoolStateManager();
});

afterEach(async function () {
await connectionPoolStateManager.onApplicationShutdown();
});

it('chooses primary endpoint first', async function () {
(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT1] = {
primary: true,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: true,
lastRequestTime: 0,
};

(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] = {
primary: false,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: true,
lastRequestTime: 0,
};
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, true);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

expect(await connectionPoolStateManager.getNextConnectedEndpoint()).toEqual(EXAMPLE_ENDPOINT1);
});

it('does not choose primary endpoint if failed', async function () {
(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT1] = {
primary: true,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: false,
lastRequestTime: 0,
};

(connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] = {
primary: false,
performanceScore: 100,
failureCount: 0,
endpoint: '',
backoffDelay: 0,
rateLimited: false,
failed: false,
connected: true,
lastRequestTime: 0,
};
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, true);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default);

expect(await connectionPoolStateManager.getNextConnectedEndpoint()).toEqual(EXAMPLE_ENDPOINT2);
});

it('All endpoints backoff; select a rateLimited endpoint. reason: ApiErrorType.Timeout', async function () {
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, false);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default);
await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT2, ApiErrorType.Timeout);

const nextEndpoint = await connectionPoolStateManager.getNextConnectedEndpoint();
const endpointInfo = (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] as ConnectionPoolItem<any>;
expect(nextEndpoint).toEqual(EXAMPLE_ENDPOINT2);
expect(endpointInfo.rateLimited).toBe(true);
expect((connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2].rateLimitDelay).toBe(20 * 1000);
});

it('All endpoints backoff; select a rateLimited endpoint. reason: ApiErrorType.RateLimit', async function () {
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT1, false);
await connectionPoolStateManager.addToConnections(EXAMPLE_ENDPOINT2, false);

await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT1, ApiErrorType.Default);
await connectionPoolStateManager.handleApiError(EXAMPLE_ENDPOINT2, ApiErrorType.RateLimit);

const nextEndpoint = await connectionPoolStateManager.getNextConnectedEndpoint();
const endpointInfo = (connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2] as ConnectionPoolItem<any>;
expect(nextEndpoint).toEqual(EXAMPLE_ENDPOINT2);
expect(endpointInfo.rateLimited).toBe(true);
expect((connectionPoolStateManager as any).pool[EXAMPLE_ENDPOINT2].rateLimitDelay).toBe(20 * 1000);
});

it('can calculate performance score for response time of zero', function () {
const score = (connectionPoolStateManager as any).calculatePerformanceScore(0, 0);
expect(score).not.toBeNaN();
Expand Down
52 changes: 32 additions & 20 deletions packages/node-core/src/indexer/connectionPoolState.manager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import {exitWithError} from '../process';
import {errorTypeToScoreAdjustment} from './connectionPool.service';

const RETRY_DELAY = 60 * 1000;
const MAX_RETRY_DELAY = 60 * RETRY_DELAY;
const MAX_FAILURES = 5;
const RESPONSE_TIME_WEIGHT = 0.7;
const FAILURE_WEIGHT = 0.3;
const RATE_LIMIT_DELAY = 20 * 1000;

export interface ConnectionPoolItem<T> {
endpoint: string;
Expand All @@ -22,6 +24,7 @@ export interface ConnectionPoolItem<T> {
backoffDelay: number;
failureCount: number;
rateLimited: boolean;
rateLimitDelay: number;
failed: boolean;
lastRequestTime: number;
connected: boolean;
Expand Down Expand Up @@ -72,6 +75,7 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
endpoint,
backoffDelay: 0,
rateLimited: false,
rateLimitDelay: 0,
failed: false,
connected: true,
lastRequestTime: 0,
Expand Down Expand Up @@ -191,13 +195,14 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
}

//eslint-disable-next-line @typescript-eslint/require-await
async setTimeout(endpoint: string, delay: number): Promise<void> {
async setRecoverTimeout(endpoint: string, delay: number): Promise<void> {
// Make sure there is no existing timeout
await this.clearTimeout(endpoint);

this.pool[endpoint].timeoutId = setTimeout(() => {
this.pool[endpoint].backoffDelay = 0; // Reset backoff delay only if there are no consecutive errors
this.pool[endpoint].rateLimited = false;
this.pool[endpoint].rateLimitDelay = 0;
this.pool[endpoint].failed = false;
this.pool[endpoint].timeoutId = undefined; // Clear the timeout ID

Expand Down Expand Up @@ -247,35 +252,42 @@ export class ConnectionPoolStateManager<T extends IApiConnectionSpecific<any, an
switch (errorType) {
case ApiErrorType.Connection: {
if (this.pool[endpoint].connected) {
//handleApiDisconnects was already called if this is false
//this.handleApiDisconnects(endpoint);
// The connected status does not provide service. handleApiDisconnects() will be called to handle this.
this.pool[endpoint].connected = false;
}
return;
}
case ApiErrorType.Timeout:
case ApiErrorType.RateLimit: {
// The “rateLimited” status will be selected when no endpoints are available, so we should avoid setting a large delay.
this.pool[endpoint].rateLimited = true;
this.pool[endpoint].rateLimitDelay = RATE_LIMIT_DELAY;
break;
}
case ApiErrorType.Default: {
const nextDelay = RETRY_DELAY * Math.pow(2, this.pool[endpoint].failureCount - 1); // Exponential backoff using failure count // Start with RETRY_DELAY and double on each failure
this.pool[endpoint].backoffDelay = nextDelay;

if (ApiErrorType.Timeout || ApiErrorType.RateLimit) {
this.pool[endpoint].rateLimited = true;
} else {
this.pool[endpoint].failed = true;
}

await this.setTimeout(endpoint, nextDelay);

logger.warn(
`Endpoint ${this.pool[endpoint].endpoint} experienced an error (${errorType}). Suspending for ${
nextDelay / 1000
}s.`
);
return;
// The “failed” status does not provide service.
this.pool[endpoint].failed = true;
break;
}
default: {
throw new Error(`Unknown error type ${errorType}`);
}
}

const nextDelay = this.calculateNextDelay(this.pool[endpoint]);
this.pool[endpoint].backoffDelay = nextDelay;
await this.setRecoverTimeout(endpoint, nextDelay);

logger.warn(
`Endpoint ${this.pool[endpoint].endpoint} experienced an error (${errorType}). Suspending for ${
nextDelay / 1000
}s.`
);
yoozo marked this conversation as resolved.
Show resolved Hide resolved
}

private calculateNextDelay(poolItem: ConnectionPoolItem<T>): number {
// Exponential backoff using failure count, Start with RETRY_DELAY and double on each failure, MAX_RETRY_DELAY is the maximum delay
return Math.min(RETRY_DELAY * Math.pow(2, poolItem.failureCount - 1), MAX_RETRY_DELAY);
}

private calculatePerformanceScore(responseTime: number, failureCount: number): number {
Expand Down
Loading