Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New import #153

Merged
merged 11 commits into from
Jun 28, 2024
17 changes: 3 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -217,22 +217,11 @@ Check out the [Patient-everything operation spec](https://www.hl7.org/fhir/opera

### Bulk Data Access

The server contains functionality for the FHIR Bulk Data Import operation using the [Ping and Pull Approach](https://github.com/smart-on-fhir/bulk-import/blob/master/import-pnp.md).
The server contains functionality for the bulk $import operation as defined by the [Data Exchange for Quality Measures Implementation Guide](https://build.fhir.org/ig/HL7/davinci-deqm/branches/bulk-import-draft/bulk-import.html#data-exchange-using-bulk-import).

To implement a bulk data import operation of all the resources on a FHIR Bulk Data Export server, POST a valid FHIR parameters object to `http://localhost:3000/$import`. Use the parameter format below to specify a bulk export server.
The first step in the bulk $import operation work flow is to gather data for submission and organize that data into a set of inputs that this server will retrieve. This is outlined in detail in the [DEQM IG Data Exchange Using Bulk $import Section](https://build.fhir.org/ig/HL7/davinci-deqm/branches/bulk-import-draft/bulk-import.html#data-exchange-using-bulk-import) and _by type_ inputs can be gathered using the [bulk-export-server](https://github.com/projecttacoma/bulk-export-server) $export operation. _by subject_ and hybrid _by type_ and _by subject_ inputs are not yet implemented in this server.

To implement the bulk data import operation from the data requirements for a specific measure, first POST a valid transaction bundle. Then, POST a valid FHIR parameters object to `http://localhost:3000/4_0_1/Measure/$bulk-submit-data` or `http://localhost:3000/4_0_1/Measure/<your-measure-id>/$bulk-submit-data` with the `"prefer": "respond-async"` header populated. This will kick off the "ping and pull" bulk import.

For the bulk data import operation to be successful, the user must specify an export URL to a FHIR Bulk Data Export server in the request body of the FHIR parameters object. For example, in the `parameter` array of the FHIR parameters object, the user can include

```bash
{
"name": "exportUrl",
"valueString": "https://example-export.com"
}
```

with a valid kickoff endpoint URL for the `valueString`.
To kickoff a bulk data import operation, POST a valid [Import Manifest](https://build.fhir.org/ig/HL7/davinci-deqm/branches/bulk-import-draft/StructureDefinition-ImportManifest.html) object to `http://localhost:3000/$import`.

The user can check the status of an $import or $bulk-submit-data request by copying the content-location header in the response, and sending a GET request to `http://localhost:3000/<content-location-header>`.

Expand Down
97 changes: 0 additions & 97 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
"@projecttacoma/node-fhir-server-core": "^2.2.7",
"axios": "^0.28.0",
"bee-queue": "^1.4.0",
"bulk-data-utilities": "git+https://[email protected]/projecttacoma/bulk-data-utilities",
"cors": "^2.8.5",
"cql-exec-fhir-mongo": "git+https://[email protected]/projecttacoma/cql-exec-fhir-mongo",
"dotenv": "^10.0.0",
Expand All @@ -54,4 +53,4 @@
"./test/globalSetup.js"
]
}
}
}
33 changes: 30 additions & 3 deletions src/database/dbOperations.js
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@
* which can be queried to get updates on the status of the bulk import
* @returns {string} the id of the inserted client
*/
const addPendingBulkImportRequest = async () => {
const addPendingBulkImportRequest = async body => {
const collection = db.collection('bulkImportStatuses');
const clientId = uuidv4();
const bulkImportClient = {
Expand All @@ -146,7 +146,8 @@
totalFileCount: -1,
exportedResourceCount: -1,
totalResourceCount: -1,
failedOutcomes: []
failedOutcomes: [],
importManifest: body
};
logger.debug(`Adding a bulkImportStatus for clientId: ${clientId}`);
await collection.insertOne(bulkImportClient);
Expand Down Expand Up @@ -195,6 +196,30 @@
await collection.findOneAndUpdate({ id: clientId }, { $push: { failedOutcomes: { $each: failedOutcomes } } });
};

/**
* Pushes an array of error messages to a ndjson status entry to later be converted to
* OperationOutcomes and made accessible via ndjson file to requestor
* @param {String} clientId The id associated with the bulkImport request
* @param {String} fileUrl The url for the resource ndjson
* @param {Array} failedOutcomes An array of strings with messages detailing why the resource failed import
*/
const pushNdjsonFailedOutcomes = async (clientId, fileUrl, failedOutcomes) => {
const collection = db.collection('ndjsonStatuses');
await collection.insertOne({ id: clientId + fileUrl, failedOutcomes: failedOutcomes });

Check warning on line 208 in src/database/dbOperations.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
return clientId;

Check warning on line 209 in src/database/dbOperations.js

View workflow job for this annotation

GitHub Actions / Coverage annotations (🧪 jest-coverage-report-action)

🧾 Statement is not covered

Warning! Not covered statement
};

/**
* Wrapper for the findResourceById function that only searches ndjsonStatuses db
* @param {string} clientId The id signifying the bulk status request
* @param {string} fileUrl The ndjson fileUrl
* @returns {Object} The ndjson status entry for the passed in clientId and fileUrl
*/
const getNdjsonFileStatus = async (clientId, fileUrl) => {
const status = await findResourceById(clientId + fileUrl, 'ndjsonStatuses');
return status;
};

/**
* Wrapper for the findResourceById function that only searches bulkImportStatuses db
* @param {string} clientId The id signifying the bulk status request
Expand Down Expand Up @@ -318,5 +343,7 @@
removeResource,
updateResource,
updateSuccessfulImportCount,
getCountOfCollection
getCountOfCollection,
pushNdjsonFailedOutcomes,
getNdjsonFileStatus
};
49 changes: 8 additions & 41 deletions src/server/importWorker.js
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
// Sets up queue which processes the jobs pushed to Redis
// This queue is run in a child process when the server is started
const Queue = require('bee-queue');
const { BulkImportWrappers } = require('bulk-data-utilities');
const { failBulkImportRequest, initializeBulkFileCount } = require('../database/dbOperations');
const mongoUtil = require('../database/connection');
const ndjsonQueue = require('../queue/ndjsonProcessQueue');
Expand All @@ -16,16 +15,14 @@ const importQueue = new Queue('import', {
removeOnSuccess: true
});

// This handler pulls down the jobs on Redis to handle
importQueue.process(async job => {
// Payload of createJob exists on job.data
const { clientEntry, exportURL, exportType, measureBundle, useTypeFilters } = job.data;
const { clientEntry, inputUrls } = job.data;
logger.info(`import-worker-${process.pid}: Processing Request: ${clientEntry}`);

await mongoUtil.client.connect();
// Call the existing export ndjson function that writes the files
logger.info(`import-worker-${process.pid}: Kicking off export request: ${exportURL}`);
const result = await executePingAndPull(clientEntry, exportURL, exportType, measureBundle, useTypeFilters);
// Call function to get the ndjson files
const result = await executeImportWorkflow(clientEntry, inputUrls);
if (result) {
logger.info(`import-worker-${process.pid}: Enqueued jobs for: ${clientEntry}`);
} else {
Expand All @@ -34,50 +31,20 @@ importQueue.process(async job => {
await mongoUtil.client.close();
});

/**
* Calls the bulk-data-utilities wrapper function to get data requirements for the passed in measure, convert those to
* export requests from a bulk export server, then retrieve ndjson from that server and parse it into valid transaction bundles.
* Finally, uploads the resulting transaction bundles to the server and updates the bulkstatus endpoint
* @param {string} clientEntryId The unique identifier which corresponds to the bulkstatus content location for update
* @param {string} exportUrl The url of the bulk export fhir server
* @param {string} exportType The code of the exportType
* @param {Object} measureBundle The measure bundle for which to retrieve data requirements
* @param {boolean} useTypeFilters optional boolean for whether to use type filters for bulk submit data
*/
const executePingAndPull = async (clientEntryId, exportUrl, exportType, measureBundle, useTypeFilters) => {
const executeImportWorkflow = async (clientEntryId, inputUrls) => {
try {
// Default to not use typeFilters for measure specific import
const output = await BulkImportWrappers.executeBulkImport(
exportUrl,
exportType,
measureBundle,
useTypeFilters || false
);

if (output.length === 0) {
throw new Error('Export server failed to export any resources');
}
// Calculate number of resources to export, if available. Otherwise, set to -1.
const resourceCount = output.reduce((resources, fileInfo) => {
if (resources === -1 || fileInfo.count === undefined) {
return -1;
}
return resources + fileInfo.count;
}, 0);

await initializeBulkFileCount(clientEntryId, output.length, resourceCount);
await initializeBulkFileCount(clientEntryId, inputUrls.length, -1);

// Enqueue a parsing job for each ndjson file
await ndjsonQueue.saveAll(
output.map(locationInfo =>
inputUrls.map(url =>
ndjsonQueue.createJob({
fileUrl: locationInfo.url,
fileUrl: url.url,
clientId: clientEntryId,
resourceCount: resourceCount === -1 ? -1 : locationInfo.count
resourceCount: -1
})
)
);

return true;
} catch (e) {
await failBulkImportRequest(clientEntryId, e);
Expand Down
Loading
Loading