Skip to content

Commit

Permalink
Fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
Riron committed Mar 5, 2024
1 parent 88ba9df commit 9040c6b
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 19 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build-and-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,6 @@ jobs:
curl -fsSL "$ELASTICSEARCH_URL/_cat/health?h=status"
- name: Run tests
env:
INDEX_CHUNK_SIZE: 100
ELASTICSEARCH_URL: http://localhost:${{ job.services.elasticsearch.ports[9200] }}
run: npm test -- --forceExit
4 changes: 0 additions & 4 deletions src/commands/__tests__/index.integration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ describe("Perform indexation", () => {
}
}
};
// wait for the ES refresh cycle of 1sec
await new Promise(resolve => setTimeout(resolve, 1000));

await elasticSearchClient.search(searchRequest).then(r => {
if (r.body.timed_out) {
Expand Down Expand Up @@ -97,8 +95,6 @@ describe("Perform indexation", () => {
}
}
};
// wait for the ES refresh cycle of 1sec
await new Promise(resolve => setTimeout(resolve, 1000));

await elasticSearchClient.search(searchRequest).then(r => {
if (r.body.timed_out) {
Expand Down
22 changes: 7 additions & 15 deletions src/indexation/__tests__/bulkIndex.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,16 @@ describe("bulkIndexByChunks", () => {
dataFormatterFn: dataFormatterFnMock
};
const indexNameMock = "test_index";
const CHUNK_SIZE = 100;
const CHUNK_SIZE = parseInt(`${process.env.INDEX_CHUNK_SIZE}`, 10) || 100;

beforeEach(() => {
process.env.TD_SIRENE_INDEX_MAX_CONCURRENT_REQUESTS = "1";
process.env.INDEX_CHUNK_SIZE = `${CHUNK_SIZE}`;
esClientMock.mockReset();
dataFormatterFnMock.mockReset();
esClientMock.mockImplementation(() => Promise.resolve());
dataFormatterFnMock.mockImplementation((body, _) => Promise.resolve(body));
});

test("Should immediately send a request if body size is less than CHUNK_SIZE", async () => {
// One document to index
const bodyMock: ElasticBulkNonFlatPayload = [
[{ index: { _index: indexNameMock } }, { field: "value1" }]
];
await bulkIndexByChunks(bodyMock, indexConfigMock, indexNameMock);
expect(esClientMock).toHaveBeenCalledTimes(1);
});

test("Should slice the body into chunks and send each as a separate request", async () => {
// One Chunk + 1
const bodyMock: ElasticBulkNonFlatPayload = Array(CHUNK_SIZE + 1)
Expand Down Expand Up @@ -72,10 +62,12 @@ describe("bulkIndexByChunks", () => {
});

test("Should call dataFormatterFn if it is a function", async () => {
// One document to index
const bodyMock: ElasticBulkNonFlatPayload = [
[{ index: { _index: indexNameMock } }, { my_document_field: "value1" }]
];
const bodyMock: ElasticBulkNonFlatPayload = Array(CHUNK_SIZE + 1)
.fill(0)
.map((_, i) => [
{ index: { _id: `${i}`, _index: indexNameMock } },
{ my_document_field: `value${i}` }
]);
await bulkIndexByChunks(bodyMock, indexConfigMock, indexNameMock);
expect(indexConfigMock.dataFormatterFn).toHaveBeenCalled();
});
Expand Down
12 changes: 12 additions & 0 deletions src/indexation/elasticSearch.helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,13 @@ export const bulkIndexByChunks = async (
bodyBuffer = [];
};

export const flushBuffer = async (
indexConfig: IndexProcessConfig,
indexName: string
) => {
await request(indexName, indexConfig, bodyBuffer);
};

/**
* Writable stream that parses CSV to an ES bulk body
*/
Expand Down Expand Up @@ -355,6 +362,11 @@ const getWritableParserAndIndexer = (
)
.then(() => next())
.catch(err => next(err));
},
final: async callback => {
// Because we buffer chunks, we need to flush it at the end
await flushBuffer(indexConfig, indexName);
callback();
}
});

Expand Down

0 comments on commit 9040c6b

Please sign in to comment.