diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 8d6963b..583fe06 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -35,5 +35,6 @@ jobs: curl -fsSL "$ELASTICSEARCH_URL/_cat/health?h=status" - name: Run tests env: + INDEX_CHUNK_SIZE: 100 ELASTICSEARCH_URL: http://localhost:${{ job.services.elasticsearch.ports[9200] }} run: npm test -- --forceExit diff --git a/package.json b/package.json index efc3a88..365fb9a 100644 --- a/package.json +++ b/package.json @@ -4,11 +4,10 @@ "description": "Indexation of french companies from INSEE's database", "main": "src/index.ts", "scripts": { - "scalingo-postbuild": "npm run build && npm prune --production", "build": "tsc", "watch": "tsc -w", "types": "tsc -noEmit", - "start": "npm run build", + "start": "npx --yes http-server", "index": "npm run index:sirene && npm run index:siret", "preindex:dev": "export NODE_ENV=dev && docker-compose up -d elasticsearch && npm run build && npm run index:sirene && npm run index:siret", "index:dev": "npm run index:sirene:dev && npm run index:siret:dev", diff --git a/src/commands/__tests__/index.integration.ts b/src/commands/__tests__/index.integration.ts index 573f356..3195652 100644 --- a/src/commands/__tests__/index.integration.ts +++ b/src/commands/__tests__/index.integration.ts @@ -61,8 +61,6 @@ describe("Perform indexation", () => { } } }; - // wait for the ES refresh cycle of 1sec - await new Promise(resolve => setTimeout(resolve, 1000)); await elasticSearchClient.search(searchRequest).then(r => { if (r.body.timed_out) { @@ -94,11 +92,10 @@ describe("Perform indexation", () => { body: { query: { match_all: {} - } + }, + size: 300 } }; - // wait for the ES refresh cycle of 1sec - await new Promise(resolve => setTimeout(resolve, 1000)); await elasticSearchClient.search(searchRequest).then(r => { if (r.body.timed_out) { diff --git a/src/common/logger.ts b/src/common/logger.ts index e833303..864a9f4 100644 --- a/src/common/logger.ts +++ b/src/common/logger.ts @@ -20,7 +20,7 @@ const LOG_TO_HTTP = const logFormat = format.combine( format.label({ label: "trackdechets-sirene-search" }), format.timestamp({ - format: "HH-MM:ss YYYY-MM-DD" + format: "HH-mm:ss YYYY-MM-DD" }), format.prettyPrint(), format.colorize(), diff --git a/src/indexation/__tests__/bulkIndex.test.ts b/src/indexation/__tests__/bulkIndex.test.ts index d26bbe9..9e9ae3e 100644 --- a/src/indexation/__tests__/bulkIndex.test.ts +++ b/src/indexation/__tests__/bulkIndex.test.ts @@ -24,26 +24,16 @@ describe("bulkIndexByChunks", () => { dataFormatterFn: dataFormatterFnMock }; const indexNameMock = "test_index"; - const CHUNK_SIZE = 100; + const CHUNK_SIZE = parseInt(`${process.env.INDEX_CHUNK_SIZE}`, 10) || 100; beforeEach(() => { process.env.TD_SIRENE_INDEX_MAX_CONCURRENT_REQUESTS = "1"; - process.env.INDEX_CHUNK_SIZE = `${CHUNK_SIZE}`; esClientMock.mockReset(); dataFormatterFnMock.mockReset(); esClientMock.mockImplementation(() => Promise.resolve()); dataFormatterFnMock.mockImplementation((body, _) => Promise.resolve(body)); }); - test("Should immediately send a request if body size is less than CHUNK_SIZE", async () => { - // One document to index - const bodyMock: ElasticBulkNonFlatPayload = [ - [{ index: { _id: "1", _index: indexNameMock } }, { field: "value1" }] - ]; - await bulkIndexByChunks(bodyMock, indexConfigMock, indexNameMock); - expect(esClientMock).toHaveBeenCalledTimes(1); - }); - test("Should slice the body into chunks and send each as a separate request", async () => { // One Chunk + 1 const bodyMock: ElasticBulkNonFlatPayload = Array(CHUNK_SIZE + 1) @@ -72,13 +62,12 @@ describe("bulkIndexByChunks", () => { }); test("Should call dataFormatterFn if it is a function", async () => { - // One document to index - const bodyMock: ElasticBulkNonFlatPayload = [ - [ - { index: { _id: "1", _index: indexNameMock } }, - { my_document_field: "value1" } - ] - ]; + const bodyMock: ElasticBulkNonFlatPayload = Array(CHUNK_SIZE + 1) + .fill(0) + .map((_, i) => [ + { index: { _id: `${i}`, _index: indexNameMock } }, + { my_document_field: `value${i}` } + ]); await bulkIndexByChunks(bodyMock, indexConfigMock, indexNameMock); expect(indexConfigMock.dataFormatterFn).toHaveBeenCalled(); }); diff --git a/src/indexation/elasticSearch.helpers.ts b/src/indexation/elasticSearch.helpers.ts index 4a66303..9bb9650 100644 --- a/src/indexation/elasticSearch.helpers.ts +++ b/src/indexation/elasticSearch.helpers.ts @@ -1,6 +1,6 @@ import fs from "fs"; -import stream, { Writable } from "stream"; -import util from "util"; +import { Writable } from "stream"; +import { pipeline } from "node:stream/promises"; import { BulkOperationContainer, BulkResponse @@ -17,7 +17,6 @@ import { INDEX_ALIAS_NAME_SEPARATOR } from "./indexInsee.helpers"; const { ResponseError } = errors; -const pipeline = util.promisify(stream.pipeline); const pjson = require("../../package.json"); /** @@ -98,8 +97,7 @@ const finalizeNewIndexRelease = async ( index: indexName, body: { index: { - number_of_replicas: process.env.TD_SIRENE_INDEX_NB_REPLICAS || "2", // 2 replicas is optimal for a 3 nodes cluster - refresh_interval: process.env.TD_SIRENE_INDEX_REFRESH_INTERVAL || "1s" + number_of_replicas: process.env.TD_SIRENE_INDEX_NB_REPLICAS || "2" // 2 replicas is optimal for a 3 nodes cluster } } }); @@ -141,49 +139,29 @@ const finalizeNewIndexRelease = async ( } }; -/** - * Log bulkIndex errors and retries in some cases - */ -const logBulkErrorsAndRetry = async ( - indexName: string, +function sleep(ms: number) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +const getOperationsToRetry = ( bulkResponse: BulkResponse, - body: BulkOperationContainer[] + bulkQueryBody: BulkOperationContainer[] ) => { - if (bulkResponse.errors) { - logger.error( - `BulkIndex ERROR on index ${indexName}, retrying but the index may be corrupt` - ); - for (let k = 0; k < bulkResponse.items.length; k++) { - const action = bulkResponse.items[k]!; - const operations: string[] = Object.keys(action); - for (const operation of operations) { - const opType = operation; - if (opType && action[opType]?.error) { - // If the status is 429 it means that we can retry the document - if (action[opType]?.status === 429) { - try { - await client.index({ - index: indexName, - id: body[k * 2].index?._id as string, - body: body[k * 2 + 1], - type: "_doc", - refresh: false - }); - } catch (err) { - // do nothing - return; - } - } - } + const operationsToRetry: BulkOperationContainer[] = []; + + for (let k = 0; k < bulkResponse.items.length; k++) { + const action = bulkResponse.items[k]; + const operationTypes: string[] = Object.keys(action); + for (const opType of operationTypes) { + // If the status is 429 it means that we can retry the document + if (opType && action[opType]?.error && action[opType]?.status === 429) { + operationsToRetry.push(bulkQueryBody[k * 2], bulkQueryBody[k * 2 + 1]); // push [index header, index content] } } } + return operationsToRetry; }; -function sleep(ms: number) { - return new Promise(resolve => setTimeout(resolve, ms)); -} - /** * Calls client.bulk and retry */ @@ -193,7 +171,7 @@ const requestBulkIndex = async ( ): Promise => { const maxRetries = 5; let retries = 0; - let waitTime = 1000; // Initial wait time in milliseconds + let waitTime = 5000; // in milliseconds if (!body || !body.length) { // nothing to index @@ -207,12 +185,16 @@ const requestBulkIndex = async ( _source_excludes: ["items.index._*", "took"] }); // Log error data and continue - if (bulkResponse) { - await logBulkErrorsAndRetry(indexName, bulkResponse.body, body); + if (bulkResponse && bulkResponse.body.errors) { + logger.error( + `BulkIndex ERROR on index ${indexName}, retrying but the index may be corrupt` + ); + const toRetry = getOperationsToRetry(bulkResponse.body, body); + await requestBulkIndex(indexName, toRetry); } return; } catch (bulkIndexError) { - // this error ihappens when the Elasticserver cannot take more data input + // this error happens when the Elasticserver cannot take more data input // so we can sleep and retry if ( bulkIndexError instanceof ResponseError && @@ -274,6 +256,15 @@ const request = async ( ); }; +// Queue holding the bulk indexation requests promises +// It's a global variable to keep track of the promises +// across the different chunks calls. +const indexPromisesQueue: Promise[] = []; + +// Buffer to accumulate the body before indexing, +// to avoid indexing chunks that are too small +let bodyBuffer: ElasticBulkNonFlatPayload = []; + /** * Bulk Index and collect errors * controls the maximum chunk size because unzip does not @@ -283,43 +274,45 @@ export const bulkIndexByChunks = async ( indexConfig: IndexProcessConfig, indexName: string ): Promise => { - // immediat return the chunk when size is greater than the data streamed - if (CHUNK_SIZE > body.length) { - await request(indexName, indexConfig, body); + // Accumulate chucks in a buffer to avoid indexing chunks that are too small + bodyBuffer.push(...body); + if (bodyBuffer.length < CHUNK_SIZE) { return; } - const promises: Promise[] = []; - // number if chunk requests in-flight - let numberOfChunkRequests = 0; - logger.info( - `Number of chunks to process : ${Math.floor(body.length / CHUNK_SIZE)}` - ); - // loop over other chunks - for (let i = 0; i < body.length; i += CHUNK_SIZE) { + for (let i = 0; i < bodyBuffer.length; i += CHUNK_SIZE) { + if (indexPromisesQueue.length >= TD_SIRENE_INDEX_MAX_CONCURRENT_REQUESTS) { + await Promise.race(indexPromisesQueue); + } + const end = i + CHUNK_SIZE; - const slice = body.slice(i, end); + const slice = bodyBuffer.slice(i, end); const promise = request(indexName, indexConfig, slice); - if (TD_SIRENE_INDEX_MAX_CONCURRENT_REQUESTS > 1) { - promises.push(promise); - numberOfChunkRequests++; // Increment the in-flight counter - - // Check if the maximum number of promises is reached - if (numberOfChunkRequests >= TD_SIRENE_INDEX_MAX_CONCURRENT_REQUESTS) { - await Promise.race(promises); // Wait for any one of the promises to resolve - numberOfChunkRequests--; // Decrement the in-flight counter - } - } else { - // no concurrency - await promise; - if (TD_SIRENE_INDEX_SLEEP_BETWEEN_CHUNKS) { - await sleep(TD_SIRENE_INDEX_SLEEP_BETWEEN_CHUNKS); - } + + const autoCleanPromise = promise.then(() => { + indexPromisesQueue.splice( + indexPromisesQueue.indexOf(autoCleanPromise), + 1 + ); + }); + + indexPromisesQueue.push(autoCleanPromise); + + // Wait between chunks can be usefull to slow down the write stream, + // and avoid having too many small chunks in the queue + if (TD_SIRENE_INDEX_SLEEP_BETWEEN_CHUNKS) { + await sleep(TD_SIRENE_INDEX_SLEEP_BETWEEN_CHUNKS); } } - if (promises.length > 0) { - await Promise.all(promises); - } + + bodyBuffer = []; +}; + +export const flushBuffer = async ( + indexConfig: IndexProcessConfig, + indexName: string +) => { + await request(indexName, indexConfig, bodyBuffer); }; /** @@ -331,13 +324,13 @@ const getWritableParserAndIndexer = ( ) => new Writable({ // Increase memory usage for better performance - // defauly 16 KiB (16*1024=16384) + // default is 16 KiB (16*1024=16384) highWaterMark: TD_SIRENE_INDEX_MAX_HIGHWATERMARK, objectMode: true, writev: (csvLines, next) => { const body: ElasticBulkNonFlatPayloadWithNull = csvLines.map( - (chunk, _i) => { - const doc = chunk.chunk; + (csvLine, _i) => { + const doc = csvLine.chunk; // skip lines without "idKey" column because we cannot miss the _id in ES if ( doc[indexConfig.idKey] === undefined || @@ -351,7 +344,6 @@ const getWritableParserAndIndexer = ( return [ { index: { - _id: doc[indexConfig.idKey], _index: indexName, // Next major ES version won't need _type anymore _type: "_doc" @@ -370,6 +362,11 @@ const getWritableParserAndIndexer = ( ) .then(() => next()) .catch(err => next(err)); + }, + final: async callback => { + // Because we buffer chunks, we need to flush it at the end + await flushBuffer(indexConfig, indexName); + callback(); } }); @@ -383,36 +380,41 @@ export const streamReadAndIndex = async ( isReleaseIndexation = true ): Promise => { const headers = indexConfig.headers; - const writableStream = getWritableParserAndIndexer(indexConfig, indexName); // stop parsing CSV after MAX_ROWS const maxRows = parseInt(process.env.MAX_ROWS as string, 10); - await pipeline( - fs.createReadStream(csvPath), - parse({ - headers, - ignoreEmpty: true, - discardUnmappedColumns: true, - ...(maxRows && { maxRows }) + + const readableStream = fs.createReadStream(csvPath); + const parseCsvStream = parse({ + headers, + ignoreEmpty: true, + discardUnmappedColumns: true, + ...(maxRows && { maxRows }) + }) + .transform((data, callback) => { + if (!!indexConfig.transformCsv) { + indexConfig.transformCsv(data, callback); + } else { + callback(null, data); + } }) - .transform((data, callback) => { - if (!!indexConfig.transformCsv) { - indexConfig.transformCsv(data, callback); - } else { - callback(null, data); - } - }) - .on("error", error => { - throw error; - }) - .on("end", async (rowCount: number) => { - logger.info(`Finished parsing ${rowCount} CSV rows`); - }), - writableStream - ); + .on("error", error => { + throw error; + }) + .on("end", async (rowCount: number) => { + logger.info(`Finished parsing ${rowCount} CSV rows`); + }); + const writableStream = getWritableParserAndIndexer(indexConfig, indexName); + + await pipeline(readableStream, parseCsvStream, writableStream); + // roll-over index alias if (isReleaseIndexation) { await finalizeNewIndexRelease(indexConfig.alias, indexName); } + + // Auto refresh is disabled, we manually refresh after each indexation + await client.indices.refresh({ index: indexName }); + logger.info(`Finished indexing ${indexName} with alias ${indexConfig.alias}`); return csvPath; }; diff --git a/src/indexation/indexInsee.helpers.ts b/src/indexation/indexInsee.helpers.ts index ca02833..d6fbf53 100644 --- a/src/indexation/indexInsee.helpers.ts +++ b/src/indexation/indexInsee.helpers.ts @@ -94,24 +94,37 @@ const siretWithUniteLegaleFormatter = async ( if (!body.length) { return []; } - const response = await client.mget({ + const response = await client.search({ index: sireneIndexConfig.alias, body: { - ids: body.map(doc => doc[1].siren) + size: 10_000, + query: { + terms: { + siren: body.map(doc => doc[1].siren) + } + } } }); - if (!response.body.docs.length) { + if (!response.body.hits.total.value) { logger.error( `Empty SIRENE data returned from ${extras.sireneIndexConfig.alias}, final data may be corrupted` ); } - return response.body.docs.map((sirenDoc, i) => [ - body[i][0], - { - ...body[i][1], - ...sirenDoc._source - } - ]); + + const sirenDocsLookup = response.body.hits.hits.reduce((acc, hit) => { + acc[hit._source.siren] = hit._source; + return acc; + }, {}); + + return body.map(siretDoc => { + return [ + siretDoc[0], + { + ...siretDoc[1], + ...sirenDocsLookup[siretDoc[1].siren] + } + ]; + }); }; /** diff --git a/src/indexation/types.ts b/src/indexation/types.ts index fa3e0aa..6ebdb46 100644 --- a/src/indexation/types.ts +++ b/src/indexation/types.ts @@ -28,7 +28,6 @@ export interface ElasticBulkIndexError { type ElasticBulkPrepayload = { index: { - _id: string; _index: string; // Next major ES version won't need _type anymore _type?: "_doc";