Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New import #153

Merged
merged 11 commits into from
Jun 28, 2024
97 changes: 0 additions & 97 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@projecttacoma/node-fhir-server-core": "^2.2.7",
"axios": "^0.28.0",
"bee-queue": "^1.4.0",
"bulk-data-utilities": "git+https://[email protected]/projecttacoma/bulk-data-utilities",
"cors": "^2.8.5",
"cql-exec-fhir-mongo": "git+https://[email protected]/projecttacoma/cql-exec-fhir-mongo",
"dotenv": "^10.0.0",
Expand All @@ -54,4 +53,4 @@
"./test/globalSetup.js"
]
}
}
}
33 changes: 30 additions & 3 deletions src/database/dbOperations.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
* which can be queried to get updates on the status of the bulk import
* @returns {string} the id of the inserted client
*/
const addPendingBulkImportRequest = async () => {
const addPendingBulkImportRequest = async body => {
const collection = db.collection('bulkImportStatuses');
const clientId = uuidv4();
const bulkImportClient = {
Expand All @@ -146,7 +146,8 @@
totalFileCount: -1,
exportedResourceCount: -1,
totalResourceCount: -1,
failedOutcomes: []
failedOutcomes: [],
importManifest: body
};
logger.debug(`Adding a bulkImportStatus for clientId: ${clientId}`);
await collection.insertOne(bulkImportClient);
Expand Down Expand Up @@ -195,6 +196,30 @@
await collection.findOneAndUpdate({ id: clientId }, { $push: { failedOutcomes: { $each: failedOutcomes } } });
};

/**
* Pushes an array of error messages to a ndjson status entry to later be converted to
* OperationOutcomes and made accessible via ndjson file to requestor
* @param {String} clientId The id associated with the bulkImport request
* @param {String} fileUrl The url for the resource ndjson
* @param {Array} failedOutcomes An array of strings with messages detailing why the resource failed import
*/
const pushNdjsonFailedOutcomes = async (clientId, fileUrl, failedOutcomes) => {
const collection = db.collection('ndjsonStatuses');
await collection.insertOne({ id: clientId + fileUrl, failedOutcomes: failedOutcomes });

Check warning on line 208 in src/database/dbOperations.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
return clientId;

Check warning on line 209 in src/database/dbOperations.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
};

/**
* Wrapper for the findResourceById function that only searches ndjsonStatuses db
* @param {string} clientId The id signifying the bulk status request
* @param {string} fileUrl The ndjson fileUrl
* @returns {Object} The ndjson status entry for the passed in clientId and fileUrl
*/
const getNdjsonFileStatus = async (clientId, fileUrl) => {
const status = await findResourceById(clientId + fileUrl, 'ndjsonStatuses');
return status;
};

/**
* Wrapper for the findResourceById function that only searches bulkImportStatuses db
* @param {string} clientId The id signifying the bulk status request
Expand Down Expand Up @@ -318,5 +343,7 @@
removeResource,
updateResource,
updateSuccessfulImportCount,
getCountOfCollection
getCountOfCollection,
pushNdjsonFailedOutcomes,
getNdjsonFileStatus
};
50 changes: 9 additions & 41 deletions src/server/importWorker.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Sets up queue which processes the jobs pushed to Redis
// This queue is run in a child process when the server is started
const Queue = require('bee-queue');
const { BulkImportWrappers } = require('bulk-data-utilities');
const { failBulkImportRequest, initializeBulkFileCount } = require('../database/dbOperations');
const mongoUtil = require('../database/connection');
const ndjsonQueue = require('../queue/ndjsonProcessQueue');
Expand All @@ -16,16 +15,15 @@ const importQueue = new Queue('import', {
removeOnSuccess: true
});

// This handler pulls down the jobs on Redis to handle
// New
importQueue.process(async job => {
// Payload of createJob exists on job.data
const { clientEntry, exportURL, exportType, measureBundle, useTypeFilters } = job.data;
const { clientEntry, inputUrls } = job.data;
logger.info(`import-worker-${process.pid}: Processing Request: ${clientEntry}`);

await mongoUtil.client.connect();
// Call the existing export ndjson function that writes the files
logger.info(`import-worker-${process.pid}: Kicking off export request: ${exportURL}`);
const result = await executePingAndPull(clientEntry, exportURL, exportType, measureBundle, useTypeFilters);
// Call function to get the ndjson files
const result = await executeNewImportWorkflow(clientEntry, inputUrls);
if (result) {
logger.info(`import-worker-${process.pid}: Enqueued jobs for: ${clientEntry}`);
} else {
Expand All @@ -34,50 +32,20 @@ importQueue.process(async job => {
await mongoUtil.client.close();
});

/**
* Calls the bulk-data-utilities wrapper function to get data requirements for the passed in measure, convert those to
* export requests from a bulk export server, then retrieve ndjson from that server and parse it into valid transaction bundles.
* Finally, uploads the resulting transaction bundles to the server and updates the bulkstatus endpoint
* @param {string} clientEntryId The unique identifier which corresponds to the bulkstatus content location for update
* @param {string} exportUrl The url of the bulk export fhir server
* @param {string} exportType The code of the exportType
* @param {Object} measureBundle The measure bundle for which to retrieve data requirements
* @param {boolean} useTypeFilters optional boolean for whether to use type filters for bulk submit data
*/
const executePingAndPull = async (clientEntryId, exportUrl, exportType, measureBundle, useTypeFilters) => {
const executeNewImportWorkflow = async (clientEntryId, inputUrls) => {
try {
// Default to not use typeFilters for measure specific import
const output = await BulkImportWrappers.executeBulkImport(
exportUrl,
exportType,
measureBundle,
useTypeFilters || false
);

if (output.length === 0) {
throw new Error('Export server failed to export any resources');
}
// Calculate number of resources to export, if available. Otherwise, set to -1.
const resourceCount = output.reduce((resources, fileInfo) => {
if (resources === -1 || fileInfo.count === undefined) {
return -1;
}
return resources + fileInfo.count;
}, 0);

await initializeBulkFileCount(clientEntryId, output.length, resourceCount);
await initializeBulkFileCount(clientEntryId, inputUrls.length, -1);

// Enqueue a parsing job for each ndjson file
await ndjsonQueue.saveAll(
output.map(locationInfo =>
inputUrls.map(url =>
ndjsonQueue.createJob({
fileUrl: locationInfo.url,
fileUrl: url.url,
clientId: clientEntryId,
resourceCount: resourceCount === -1 ? -1 : locationInfo.count
resourceCount: -1
})
)
);

return true;
} catch (e) {
await failBulkImportRequest(clientEntryId, e);
Expand Down
23 changes: 19 additions & 4 deletions src/server/ndjsonWorker.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
// This queue is run in a child process when the server is started
const Queue = require('bee-queue');
const axios = require('axios');
const { updateResource, pushBulkFailedOutcomes } = require('../database/dbOperations');
const { updateResource, pushBulkFailedOutcomes, pushNdjsonFailedOutcomes } = require('../database/dbOperations');
const mongoUtil = require('../database/connection');
const { checkSupportedResource } = require('../util/baseUtils');
const logger = require('./logger');
Expand Down Expand Up @@ -39,7 +39,18 @@ ndjsonWorker.process(async job => {
logger.info(`ndjson-worker-${process.pid}: processing ${fileName}`);

await mongoUtil.client.connect();
const ndjsonResources = await retrieveNDJSONFromLocation(fileUrl);

let ndjsonResources;
try {
ndjsonResources = await retrieveNDJSONFromLocation(fileUrl);
} catch (e) {
const outcome = [`ndjson retrieval of ${fileUrl} failed with message: ${e.message}`];

await pushNdjsonFailedOutcomes(clientId, fileUrl, outcome);
await pushBulkFailedOutcomes(clientId, outcome);
process.send({ clientId, resourceCount: 0, successCount: 0 });
return;
elsaperelli marked this conversation as resolved.
Show resolved Hide resolved
}

const insertions = ndjsonResources
.trim()
Expand Down Expand Up @@ -68,15 +79,19 @@ ndjsonWorker.process(async job => {
const outcomes = await Promise.allSettled(insertions);

const failedOutcomes = outcomes.filter(outcome => outcome.status === 'rejected');
const succesfulOutcomes = outcomes.filter(outcome => outcome.status === 'fulfilled');
const successfulOutcomes = outcomes.filter(outcome => outcome.status === 'fulfilled');

const outcomeData = [];

failedOutcomes.forEach(out => {
outcomeData.push(out.reason.message);
});

// keep track of failed outcomes for individual ndjson files
await pushNdjsonFailedOutcomes(clientId, fileUrl, outcomeData);

await pushBulkFailedOutcomes(clientId, outcomeData);
const successCount = succesfulOutcomes.length;
const successCount = successfulOutcomes.length;
logger.info(`ndjson-worker-${process.pid}: processed ${fileName}`);

process.send({ clientId, resourceCount, successCount });
Expand Down
Loading
Loading