Skip to content

Commit

Permalink
Merge pull request #113 from AplinkosMinisterija/fixing-cpu-overkill
Browse files Browse the repository at this point in the history
Fixing cpu overkill
  • Loading branch information
ambrazasp authored Nov 9, 2024
2 parents 68b6e0a + 26087c1 commit d51217b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
8 changes: 8 additions & 0 deletions mixins/database.mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,14 @@ export default function (opts: any = {}) {
return;
},

async removeMany(ctx: any) {
// moleculer removeEntities doesn't have great garbage collection
// it increases CPU until it crashes if called many times
const adapter = await this.getAdapter(ctx);
const table = adapter.getTable();
return table.whereIn('id', ctx.params.id).update({ deletedAt: new Date() }, ['id']);
},

async removeAllEntities(ctx: any) {
return await this.clearEntities(ctx);
},
Expand Down
25 changes: 13 additions & 12 deletions mixins/integrations.mixin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,44 +132,45 @@ export function IntegrationsMixin() {
app: { $in: apps.map((a) => a.id) },
};

let itemsCount: number = await ctx.call('events.count', { query, scope: false });
const totalCount = itemsCount;
let page = 1;
const totalCount: number = await ctx.call('events.count', { query, scope: false });
this.stats.invalid.removed = 0;
const startTime = new Date();

const fields = ['id', 'deletedAt', 'externalId'];

while (itemsCount > 0) {
// remove with pagination
const pageSize = 10000;

for (let page = 1; page < Math.ceil(totalCount / pageSize); page++) {
const eventsPage: DBPagination<Event<null, 'id' | 'deletedAt' | 'externalId'>> =
await ctx.call('events.list', {
query,
pageSize: 10000,
pageSize,
page,
fields,
sort: 'id',
scope: false, // needed for not skipping any events
});

itemsCount = itemsCount - eventsPage.rows.length;
page++;

if (!eventsPage.rows.length) {
itemsCount = 0;
continue;
}

const invalidEvents = eventsPage.rows.filter(
(item) => !validExternalIds.includes(item.externalId) && !item.deletedAt,
);

for (const e of invalidEvents) {
await ctx.call('events.remove', { id: e.id });
this.addTotal();
this.addInvalid();
this.stats.invalid.removed++;
}

const progress = this.calcProgression(totalCount - itemsCount, totalCount, startTime);
const eventIds = invalidEvents.map((e) => e.id);
if (eventIds?.length) {
await ctx.call('events.removeMany', { id: eventIds });
}

const progress = this.calcProgression(page * pageSize, totalCount, startTime);
this.broker.logger.info(`${this.name} removing in progress: ${progress.text}`);
}
},
Expand Down

0 comments on commit d51217b

Please sign in to comment.