diff --git a/.github/workflows/build-and-test.yml b/.github/workflows/build-and-test.yml index 8d6963b..583fe06 100644 --- a/.github/workflows/build-and-test.yml +++ b/.github/workflows/build-and-test.yml @@ -35,5 +35,6 @@ jobs: curl -fsSL "$ELASTICSEARCH_URL/_cat/health?h=status" - name: Run tests env: + INDEX_CHUNK_SIZE: 100 ELASTICSEARCH_URL: http://localhost:${{ job.services.elasticsearch.ports[9200] }} run: npm test -- --forceExit diff --git a/src/commands/__tests__/index.integration.ts b/src/commands/__tests__/index.integration.ts index 573f356..296e522 100644 --- a/src/commands/__tests__/index.integration.ts +++ b/src/commands/__tests__/index.integration.ts @@ -61,8 +61,6 @@ describe("Perform indexation", () => { } } }; - // wait for the ES refresh cycle of 1sec - await new Promise(resolve => setTimeout(resolve, 1000)); await elasticSearchClient.search(searchRequest).then(r => { if (r.body.timed_out) { @@ -97,8 +95,6 @@ describe("Perform indexation", () => { } } }; - // wait for the ES refresh cycle of 1sec - await new Promise(resolve => setTimeout(resolve, 1000)); await elasticSearchClient.search(searchRequest).then(r => { if (r.body.timed_out) { diff --git a/src/indexation/__tests__/bulkIndex.test.ts b/src/indexation/__tests__/bulkIndex.test.ts index 830706c..9e9ae3e 100644 --- a/src/indexation/__tests__/bulkIndex.test.ts +++ b/src/indexation/__tests__/bulkIndex.test.ts @@ -24,26 +24,16 @@ describe("bulkIndexByChunks", () => { dataFormatterFn: dataFormatterFnMock }; const indexNameMock = "test_index"; - const CHUNK_SIZE = 100; + const CHUNK_SIZE = parseInt(`${process.env.INDEX_CHUNK_SIZE}`, 10) || 100; beforeEach(() => { process.env.TD_SIRENE_INDEX_MAX_CONCURRENT_REQUESTS = "1"; - process.env.INDEX_CHUNK_SIZE = `${CHUNK_SIZE}`; esClientMock.mockReset(); dataFormatterFnMock.mockReset(); esClientMock.mockImplementation(() => Promise.resolve()); dataFormatterFnMock.mockImplementation((body, _) => Promise.resolve(body)); }); - test("Should immediately send a request if body size is less than CHUNK_SIZE", async () => { - // One document to index - const bodyMock: ElasticBulkNonFlatPayload = [ - [{ index: { _index: indexNameMock } }, { field: "value1" }] - ]; - await bulkIndexByChunks(bodyMock, indexConfigMock, indexNameMock); - expect(esClientMock).toHaveBeenCalledTimes(1); - }); - test("Should slice the body into chunks and send each as a separate request", async () => { // One Chunk + 1 const bodyMock: ElasticBulkNonFlatPayload = Array(CHUNK_SIZE + 1) @@ -72,10 +62,12 @@ describe("bulkIndexByChunks", () => { }); test("Should call dataFormatterFn if it is a function", async () => { - // One document to index - const bodyMock: ElasticBulkNonFlatPayload = [ - [{ index: { _index: indexNameMock } }, { my_document_field: "value1" }] - ]; + const bodyMock: ElasticBulkNonFlatPayload = Array(CHUNK_SIZE + 1) + .fill(0) + .map((_, i) => [ + { index: { _id: `${i}`, _index: indexNameMock } }, + { my_document_field: `value${i}` } + ]); await bulkIndexByChunks(bodyMock, indexConfigMock, indexNameMock); expect(indexConfigMock.dataFormatterFn).toHaveBeenCalled(); }); diff --git a/src/indexation/elasticSearch.helpers.ts b/src/indexation/elasticSearch.helpers.ts index 3cab783..9bb9650 100644 --- a/src/indexation/elasticSearch.helpers.ts +++ b/src/indexation/elasticSearch.helpers.ts @@ -308,6 +308,13 @@ export const bulkIndexByChunks = async ( bodyBuffer = []; }; +export const flushBuffer = async ( + indexConfig: IndexProcessConfig, + indexName: string +) => { + await request(indexName, indexConfig, bodyBuffer); +}; + /** * Writable stream that parses CSV to an ES bulk body */ @@ -355,6 +362,11 @@ const getWritableParserAndIndexer = ( ) .then(() => next()) .catch(err => next(err)); + }, + final: async callback => { + // Because we buffer chunks, we need to flush it at the end + await flushBuffer(indexConfig, indexName); + callback(); } });