From 33221f71f9c3dfe70f733758c9281e50ffcbd001 Mon Sep 17 00:00:00 2001 From: Scott Twiname Date: Tue, 3 Dec 2024 13:55:03 +1300 Subject: [PATCH] Address comments --- .../src/indexer/blockDispatcher/block-dispatcher.ts | 1 - packages/node-core/src/utils/queues/rampQueue.ts | 6 ++++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts index 5c92cd7d71..151aed8cb7 100644 --- a/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts +++ b/packages/node-core/src/indexer/blockDispatcher/block-dispatcher.ts @@ -16,7 +16,6 @@ import {StoreService} from '../store.service'; import {IStoreModelProvider} from '../storeModelProvider'; import {IProjectService, ISubqueryProject} from '../types'; import {BaseBlockDispatcher, ProcessBlockResponse} from './base-block-dispatcher'; -// import { RampQueue } from '@subql/node-core/utils/rampQueue'; const logger = getLogger('BlockDispatcherService'); diff --git a/packages/node-core/src/utils/queues/rampQueue.ts b/packages/node-core/src/utils/queues/rampQueue.ts index 0a100cbed5..a39a6f3fe7 100644 --- a/packages/node-core/src/utils/queues/rampQueue.ts +++ b/packages/node-core/src/utils/queues/rampQueue.ts @@ -23,6 +23,7 @@ const logger = getLogger('RampQueue'); export class RampQueue extends AutoQueue { #maxConcurrency: number; #sizes: number[] = []; + #totalItems = 0; constructor( private getSize: (data: T) => number, @@ -72,9 +73,9 @@ export class RampQueue extends AutoQueue { if (size > m * 2) { // Inverse of the size compared to the median. E.g if a block is 5x as big as the median then the batch size should be 1/5 of the max - const multiplier = 1 / (size / m); + const multiplier = m / size; this.setConcurrency(this.#maxConcurrency * multiplier); - } else if (this.#sizes.length % MIN_SIZES === 0) { + } else if (this.#totalItems % MIN_SIZES === 0) { // Increase by 10% of max this.setConcurrency(this.concurrency + Math.floor(this.#maxConcurrency / 10)); } @@ -87,6 +88,7 @@ export class RampQueue extends AutoQueue { if (this.#sizes.length >= MAX_SIZES) { this.#sizes.shift(); } + this.#totalItems++; this.#sizes.push(size); } }