From a89f51c43b1038cc3ad393e69735cabbe9f1009a Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Fri, 31 May 2024 17:14:31 +0100 Subject: [PATCH 1/4] Failing test to catch PostgreSQL warnings Signed-off-by: John Gomersall --- src/domain/services/import.service.spec.ts | 75 ++++++++++++++-------- src/mikro-orm.config.ts | 13 ++++ test/test.helper.ts | 26 +++++++- 3 files changed, 87 insertions(+), 27 deletions(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index 7f50450..d320cd3 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -13,7 +13,6 @@ import { createClient } from 'redis'; import { GenericContainer } from 'testcontainers'; import { setTimeout } from 'timers/promises'; -let index = 0; const lastModified = 1692032161; function testProducts() { @@ -36,24 +35,30 @@ function testProducts() { return { products, productIdExisting, productIdNew }; } -// Need to specify argumet list bellow so that calls in the assertion is typed -// eslint-disable-next-line @typescript-eslint/no-unused-vars -const findMock = jest.fn((_filter, _projection) => ({ - next: async () => { - return index++ <= mockedProducts.length ? mockedProducts[index - 1] : null; - }, - close: jest.fn(), -})); +const findCalls = []; jest.mock('mongodb', () => { return { MongoClient: jest.fn(() => ({ connect: jest.fn(), - db: () => ({ - collection: () => ({ - find: findMock, - }), - }), + db: () => { + let index = 0; + return { + collection: () => ({ + find: (...args: any) => { + findCalls.push(args); + return { + next: async () => { + return index++ <= mockedProducts.length + ? mockedProducts[index - 1] + : null; + }, + close: jest.fn(), + }; + }, + }), + }; + }, close: jest.fn(), })), }; @@ -61,12 +66,12 @@ jest.mock('mongodb', () => { let mockedProducts = []; function mockMongoDB(productList) { - index = 0; mockedProducts = productList; } // Import tests can sometimes take a little time in GitHub -jest.setTimeout(10000); +// Plus Allow a little time for the testcontainer to start +jest.setTimeout(300000); describe('importFromMongo', () => { it('should import a new product update existing products and delete missing products', async () => { @@ -103,7 +108,7 @@ describe('importFromMongo', () => { await importService.importFromMongo(); - // THEN: New product is addeded, updated product is updated and other product is unchanged + // THEN: New product is added, updated product is updated and other product is unchanged expect(deleteMock).toHaveBeenCalledTimes(1); let updateId = deleteMock.mock.calls[0][0]; // Re-format updateId the way Postgres provides it @@ -213,12 +218,12 @@ describe('importFromMongo', () => { // WHEN: Doing an incremental import from MongoDB mockMongoDB(products); - findMock.mockClear(); + findCalls.length = 0; await importService.importFromMongo(''); // THEN: Mongo find is called with the setting as a parameter - expect(findMock).toHaveBeenCalledTimes(2); // Called for normal an obsolete prodocuts - expect(findMock.mock.calls[0][0].last_modified_t.$gt).toBe( + expect(findCalls).toHaveLength(2); // Called for normal an obsolete prodocuts + expect(findCalls[0][0].last_modified_t.$gt).toBe( Math.floor(startFrom.getTime() / 1000), ); @@ -228,9 +233,9 @@ describe('importFromMongo', () => { }); }); - it('should cope with nul charactes', async () => { + it('should cope with nul characters', async () => { await createTestingModule([DomainModule], async (app) => { - // WHEN: Impoting data containing nul characters + // WHEN: Importing data containing nul characters const { productIdNew } = testProducts(); mockMongoDB([ { @@ -269,7 +274,6 @@ describe('importFromMongo', () => { // WHEN: Doing an import from MongoDB mockMongoDB(testData); - findMock.mockClear(); await importService.importFromMongo(''); // THEN: The last modified date is set correctly @@ -346,9 +350,30 @@ describe('ProductTag', () => { }); }); +describe('importWithFilter', () => { + it.only('should not get an error with concurrent imports', async () => { + await createTestingModule([DomainModule], async (app) => { + const importService = app.get(ImportService); + + // WHEN: Doing an incremental import from MongoDB + const { products, productIdExisting, productIdNew } = testProducts(); + mockMongoDB(products); + const imports = []; + // Need more than 10 concurrent imports to start to see errors + for (let i = 0; i < 11; i++) { + imports.push( + importService.importWithFilter( + { code: { $in: [productIdExisting, productIdNew] } }, + ProductSource.EVENT, + ), + ); + } + await Promise.all(imports); + }); + }); +}); + describe('receiveMessages', () => { - // Allow a little time for the testcontainer to start - jest.setTimeout(30000); it('should call importwithfilter when a message is received', async () => { await createTestingModule([DomainModule], async (app) => { // GIVEN: Redis is running diff --git a/src/mikro-orm.config.ts b/src/mikro-orm.config.ts index 7fff296..9628d15 100644 --- a/src/mikro-orm.config.ts +++ b/src/mikro-orm.config.ts @@ -7,6 +7,7 @@ import { defineConfig, } from '@mikro-orm/core'; import { SCHEMA } from './constants'; +import { Logger } from '@nestjs/common'; class DateTimeNtzType extends DateTimeType { getColumnType(): string { @@ -14,6 +15,8 @@ class DateTimeNtzType extends DateTimeType { } } +const logger = new Logger('MikroOrm'); + export default defineConfig({ entities: ['./dist/domain/entities'], entitiesTs: ['./src/domain/entities'], @@ -47,6 +50,16 @@ export default defineConfig({ path: 'dist/migrations', pathTs: 'src/migrations', }, + pool: { + afterCreate: function (conn: any, done: any) { + conn.query('select 1 as result', function (err) { + conn.on('notice', function (msg) { + logger.error('Notice from PostgreSQL: ' + msg.message); + }); + done(err, conn); + }); + }, + }, // Uncomment the below and 'app.useLogger(new Logger());' to the test to see Mikro-ORM logs // debug: ['query', 'query-params'], }); diff --git a/test/test.helper.ts b/test/test.helper.ts index 05e0da6..f15a199 100644 --- a/test/test.helper.ts +++ b/test/test.helper.ts @@ -1,5 +1,6 @@ import { MikroORM, RequestContext } from '@mikro-orm/core'; import { logger } from '@mikro-orm/nestjs'; +import { ConsoleLogger } from '@nestjs/common'; import { Test, TestingModule } from '@nestjs/testing'; import { randomBytes } from 'crypto'; @@ -7,20 +8,41 @@ export function randomCode() { return 'TEST-' + randomBytes(20).toString('base64'); } +class TestLogger extends ConsoleLogger { + errors = new Array(); + expectedErrors = 0; + constructor() { + super(); + this.setLogLevels(['error']); + } + error(message: string, ...rest: any[]) { + this.errors.push(message); + if (this.errors.length > this.expectedErrors) { + super.error(message, ...rest); + } + } + assertExpectedErrors() { + expect(this.errors).toHaveLength(this.expectedErrors); + } +} + export async function createTestingModule( imports: any[], - callback: { (app: TestingModule): Promise }, + callback: { (app: TestingModule, logger: TestLogger): Promise }, ) { + const testLogger = new TestLogger(); const app = await Test.createTestingModule({ imports: imports, }).compile(); + app.useLogger(testLogger); await app.init(); const orm = app.get(MikroORM); try { await RequestContext.createAsync(orm.em, async () => { - await callback(app); + await callback(app, testLogger); }); + testLogger.assertExpectedErrors(); } catch (e) { (e.errors ?? [e]).map((e) => logger.error(e.message, e.stack)); throw e; From 0093a103371eba847f41284e164567b1147d2ce1 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Fri, 31 May 2024 18:46:07 +0100 Subject: [PATCH 2/4] Fixes test but ideally want to make transactional Signed-off-by: John Gomersall --- src/domain/services/import.service.spec.ts | 4 ++-- src/domain/services/import.service.ts | 18 ++++++++++++------ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/src/domain/services/import.service.spec.ts b/src/domain/services/import.service.spec.ts index d320cd3..a6b4b5e 100644 --- a/src/domain/services/import.service.spec.ts +++ b/src/domain/services/import.service.spec.ts @@ -1,6 +1,6 @@ import { DomainModule } from '../domain.module'; import { ImportService } from './import.service'; -import { EntityManager } from '@mikro-orm/core'; +import { EntityManager } from '@mikro-orm/postgresql'; import { Product } from '../entities/product'; import { ProductIngredientsTag } from '../entities/product-tags'; import { createTestingModule, randomCode } from '../../../test/test.helper'; @@ -351,7 +351,7 @@ describe('ProductTag', () => { }); describe('importWithFilter', () => { - it.only('should not get an error with concurrent imports', async () => { + it('should not get an error with concurrent imports', async () => { await createTestingModule([DomainModule], async (app) => { const importService = app.get(ImportService); diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 3ceef49..167774a 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -131,7 +131,7 @@ export class ImportService { await client.close(); // Tags are popualted using raw SQL from the data field - await this.updateTags(updateId, fullImport); + await this.updateTags(updateId, fullImport, source); // If doing a full import delete all products that weren't updated if (fullImport) { @@ -236,14 +236,20 @@ export class ImportService { * SQL is then run to insert this into the individual tag tables. * This was found to be quicker than using ORM functionality */ - async updateTags(updateId: string, fullImport = false) { + async updateTags( + updateId: string, + fullImport = false, + source = ProductSource.FULL_LOAD, + ) { + const autoCommit = source !== ProductSource.EVENT; + this.logger.debug(`Updating tags for updateId: ${updateId}`); const connection = this.em.getConnection(); // Fix ingredients let logText = `Updated ingredients`; - await connection.execute('begin'); + if (autoCommit) await connection.execute('begin'); const deleted = await connection.execute( `delete from product_ingredient where product_id in (select id from product @@ -329,7 +335,7 @@ export class ImportService { affectedRows = results['affectedRows']; logText += ` > ${affectedRows}`; } - await connection.execute('commit'); + if (autoCommit) await connection.execute('commit'); this.logger.debug(logText + ' rows'); for (const [tag, entity] of Object.entries(ProductTagMap.MAPPED_TAGS)) { @@ -337,7 +343,7 @@ export class ImportService { // Get the underlying table name for the entity const tableName = this.em.getMetadata(entity).tableName; - await connection.execute('begin'); + if (autoCommit) await connection.execute('begin'); // Delete existing tags for products that were imorted on this run const deleted = await connection.execute( @@ -360,7 +366,7 @@ export class ImportService { ); // Commit after each tag to minimise server snapshot size - await connection.execute('commit'); + if (autoCommit) await connection.execute('commit'); // If this is a full load we can flag the tag as now available for query if (fullImport) { From f3fa1e5649c061f8ae466b008e46a871ae7dee49 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Sat, 1 Jun 2024 12:48:43 +0100 Subject: [PATCH 3/4] Shared memory to match production Signed-off-by: John Gomersall --- docker-compose.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/docker-compose.yml b/docker-compose.yml index 2ff1050..cfdf38b 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,6 +6,7 @@ services: - POSTGRES_USER - POSTGRES_PASSWORD - POSTGRES_DB + shm_size: 256m volumes: - dbdata:/var/lib/postgresql/data networks: From a8b1869d224763f32b7cfa02811808c483872546 Mon Sep 17 00:00:00 2001 From: John Gomersall Date: Mon, 3 Jun 2024 21:37:03 +0100 Subject: [PATCH 4/4] Address PR feedback Signed-off-by: John Gomersall --- .env | 1 + .github/workflows/container-deploy.yml | 3 +++ docker-compose.yml | 2 +- src/domain/services/import.service.ts | 12 ++++++------ src/mikro-orm.config.ts | 1 + 5 files changed, 12 insertions(+), 7 deletions(-) diff --git a/.env b/.env index 3e64455..7eff976 100644 --- a/.env +++ b/.env @@ -9,6 +9,7 @@ POSTGRES_PORT=127.0.0.1:5512 POSTGRES_DB=query POSTGRES_USER=productopener POSTGRES_PASSWORD=productopener +POSTGRES_SHM_SIZE=256m COMMON_NET_NAME=po_default MONGO_URI=mongodb://localhost:27017 #REDIS_URL=redis://localhost:6379 diff --git a/.github/workflows/container-deploy.yml b/.github/workflows/container-deploy.yml index bf173ac..9fa9069 100644 --- a/.github/workflows/container-deploy.yml +++ b/.github/workflows/container-deploy.yml @@ -34,6 +34,7 @@ jobs: echo "COMMON_NET_NAME=po_webnet" >> $GITHUB_ENV echo "MONGO_URI=mongodb://10.1.0.200:27017" >> $GITHUB_ENV echo "REDIS_URL=redis://redis:6379" >> $GITHUB_ENV + echo "POSTGRES_SHM_SIZE=1g" >> $GITHUB_ENV - name: Set various variable for production deployment if: matrix.env == 'off-query-org' run: | @@ -46,6 +47,7 @@ jobs: # mongodb and redis (through stunnel) echo "MONGO_URI=mongodb://10.1.0.102:27017" >> $GITHUB_ENV echo "REDIS_URL=redis://10.1.0.122:6379" >> $GITHUB_ENV + echo "POSTGRES_SHM_SIZE=256m" >> $GITHUB_ENV - name: Wait for container build workflow uses: tomchv/wait-my-workflow@v1.1.0 id: wait-build @@ -124,6 +126,7 @@ jobs: echo "COMMON_NET_NAME=${{ env.COMMON_NET_NAME }}" >> .env echo "MONGO_URI=${{ env.MONGO_URI }}" >> .env echo "REDIS_URL=${{ env.REDIS_URL }}" >> .env + echo "POSTGRES_SHM_SIZE=${{ env.POSTGRES_SHM_SIZE }}" >> .env echo "LOG_LEVEL=log" >> .env diff --git a/docker-compose.yml b/docker-compose.yml index cfdf38b..1f245f9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -6,7 +6,7 @@ services: - POSTGRES_USER - POSTGRES_PASSWORD - POSTGRES_DB - shm_size: 256m + shm_size: ${POSTGRES_SHM_SIZE} volumes: - dbdata:/var/lib/postgresql/data networks: diff --git a/src/domain/services/import.service.ts b/src/domain/services/import.service.ts index 167774a..08009a3 100644 --- a/src/domain/services/import.service.ts +++ b/src/domain/services/import.service.ts @@ -241,7 +241,8 @@ export class ImportService { fullImport = false, source = ProductSource.FULL_LOAD, ) { - const autoCommit = source !== ProductSource.EVENT; + // Commit after each tag for bulk (non-Redis) loads to minimise server snapshot size + const commitPerTag = source !== ProductSource.EVENT; this.logger.debug(`Updating tags for updateId: ${updateId}`); @@ -249,7 +250,7 @@ export class ImportService { // Fix ingredients let logText = `Updated ingredients`; - if (autoCommit) await connection.execute('begin'); + if (commitPerTag) await connection.execute('begin'); const deleted = await connection.execute( `delete from product_ingredient where product_id in (select id from product @@ -335,7 +336,7 @@ export class ImportService { affectedRows = results['affectedRows']; logText += ` > ${affectedRows}`; } - if (autoCommit) await connection.execute('commit'); + if (commitPerTag) await connection.execute('commit'); this.logger.debug(logText + ' rows'); for (const [tag, entity] of Object.entries(ProductTagMap.MAPPED_TAGS)) { @@ -343,7 +344,7 @@ export class ImportService { // Get the underlying table name for the entity const tableName = this.em.getMetadata(entity).tableName; - if (autoCommit) await connection.execute('begin'); + if (commitPerTag) await connection.execute('begin'); // Delete existing tags for products that were imorted on this run const deleted = await connection.execute( @@ -365,8 +366,7 @@ export class ImportService { 'run', ); - // Commit after each tag to minimise server snapshot size - if (autoCommit) await connection.execute('commit'); + if (commitPerTag) await connection.execute('commit'); // If this is a full load we can flag the tag as now available for query if (fullImport) { diff --git a/src/mikro-orm.config.ts b/src/mikro-orm.config.ts index 9628d15..2fd8482 100644 --- a/src/mikro-orm.config.ts +++ b/src/mikro-orm.config.ts @@ -52,6 +52,7 @@ export default defineConfig({ }, pool: { afterCreate: function (conn: any, done: any) { + // issue a query to verify SQL connection is working conn.query('select 1 as result', function (err) { conn.on('notice', function (msg) { logger.error('Notice from PostgreSQL: ' + msg.message);