Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

port download code to use the batch endpoint #513

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .jshintrc
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"esversion": 6,
"esversion": 8,
"node": true,
"curly": true,
"eqeqeq": true,
Expand Down
27 changes: 18 additions & 9 deletions lib/parameters.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
var fs = require( 'fs' );
var util = require( 'util' );
var glob = require( 'glob' );
var path = require( 'path' );
var _ = require('lodash');
const fs = require('fs');
const util = require('util');
const glob = require('glob');
const path = require('path');
const _ = require('lodash');
const minimist = require('minimist');

var minimist = require( 'minimist' );

var peliasConfig = require( 'pelias-config' ).generate();
const peliasConfig = require('pelias-config').generate();
const OpenAddressesAPI = require('../utils/OpenAddressesAPI');

/**
* Interprets the command-line arguments passed to the script.
Expand Down Expand Up @@ -99,7 +99,16 @@ function getFullFileList(peliasConfig, args) {
return glob.sync( args.dirPath + '/**/*.{csv,geojson}' );
} else {
// otherwise return the requested files with full path
return files.map(function(file) {
return files.map(file => {

// normalize source
const source = OpenAddressesAPI.normalize(file);

// search for files matching this source id, ending in either .geojson or .csv
const found = glob.sync(`${source}.{csv,geojson}`, { cwd: args.dirPath, absolute: true });
if (!_.isEmpty(found)) { return _.last(found); } // results are sorted, prefer .geojson

// no matching files were found, return a non-matching absolute path
return path.join(args.dirPath, file);
});
}
Expand Down
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"dependencies": {
"@hapi/joi": "^16.0.1",
"async": "^3.1.0",
"axios": "^1.2.2",
"bottleneck": "^2.19.5",
"combined-stream": "^1.0.7",
"csv-parse": "^5.0.3",
Expand All @@ -17,7 +18,7 @@
"lodash": "^4.16.0",
"minimist": "^1.2.0",
"pelias-blacklist-stream": "^1.0.0",
"pelias-config": "^4.12.0",
"pelias-config": "^5.2.0",
"pelias-dbclient": "^2.13.0",
"pelias-logger": "^1.2.1",
"pelias-model": "^9.2.0",
Expand Down
3 changes: 2 additions & 1 deletion schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ module.exports = Joi.object().keys({
dataHost: Joi.string(),
s3Options: Joi.string(),
adminLookup: Joi.boolean(),
missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no')
missingFilesAreFatal: Joi.boolean().default(false).truthy('yes').falsy('no'),
token: Joi.string().required(true),
}).unknown(false)
}).unknown(true)
}).unknown(true);
9 changes: 7 additions & 2 deletions test/schema.js
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ tape( 'unknown config fields should throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
unknown: 'value'
}
}
Expand All @@ -153,11 +154,12 @@ tape( 'unknown config fields should throw error', function(test) {

});

tape( 'configuration with only datapath should not throw error', function(test) {
tape( 'configuration with only datapath & token should not throw error', function(test) {
const config = {
imports: {
openaddresses: {
datapath: 'this is the datapath'
datapath: 'this is the datapath',
token: 'abc'
}
}
};
Expand All @@ -172,6 +174,7 @@ tape( 'valid configuration should not throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
adminLookup: false,
files: ['file 1', 'file 2']
}
Expand All @@ -188,6 +191,7 @@ tape( 'unknown children of imports should not throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
adminLookup: false,
files: ['file 1', 'file 2']
},
Expand All @@ -205,6 +209,7 @@ tape( 'unknown children of root should not throw error', function(test) {
imports: {
openaddresses: {
datapath: 'this is the datapath',
token: 'abc',
adminLookup: false,
files: ['file 1', 'file 2']
}
Expand Down
52 changes: 52 additions & 0 deletions utils/OpenAddressesAPI.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
const _ = require('lodash');
const axios = require('axios');
const config = require('pelias-config');
const logger = require('pelias-logger').get('openaddresses');
const HOST = 'https://batch.openaddresses.io';

class OpenAddressesAPI {
constructor() {
this.config = _.get(config.generate(), 'imports.openaddresses', {});
this.token = _.get(this.config, 'token');
}

// remove file extensions from 'source'
static normalize(source) {
if (!_.isString(source)) { return source; }
const norm = source.replace(/\.[^/.]+$/, '');

// source definitions previously required a file extension.
// please remove file extensions from your ~/pelias.json file
// to silence these warning messages.
if (source !== norm) {
logger.warn(`source definitions no longer require a file extension '${source}'`);
}

return norm;
}

// return the http url for a specific job id
static url(job) {
return `${HOST}/api/job/${job}/output/source.geojson.gz`;
}

// if the 'validated' mode is enabled (for financial supporters only)
isValidatedModeEnabled() {
return _.get(this.config, 'validated') === true;
}

async lookup(source) {
// support the 'validated' property for financial supporters
const params = {
source,
layer: 'addresses',
validated: this.isValidatedModeEnabled() ? 'true' : 'false'
};

// request extended info and return the first result
const versions = await axios.get(`${HOST}/api/data`, { params });
return _.isArray(versions.data) && !_.isEmpty(versions.data) ? _.head(versions.data) : {};
}
}

module.exports = OpenAddressesAPI;
124 changes: 79 additions & 45 deletions utils/download_filtered.js
Original file line number Diff line number Diff line change
@@ -1,120 +1,154 @@
const child_process = require('child_process');
const config = require( 'pelias-config' ).generate();
const config = require('pelias-config').generate();
const async = require('async');
const fs = require('fs-extra');
const path = require('path');
const temp = require('temp');
const logger = require('pelias-logger').get('openaddresses-download');
const Bottleneck = require('bottleneck/es5');

const OpenAddressesAPI = require('./OpenAddressesAPI');
const oa = new OpenAddressesAPI();

function downloadFiltered(config, callback) {
const targetDir = config.imports.openaddresses.datapath;
const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');

fs.ensureDir(targetDir, (err) => {
fs.ensureDir(targetDir, async (err) => {
if (err) {
logger.error(`error making directory ${targetDir}`, err);
return callback(err);
}

const files = getFiles(config, targetDir, callback);
logger.info(`Attempting to download selected data files: ${files.map(file => file.csv)}`);
// validate sources
const files = config.get('imports.openaddresses.files', []);
const sources = await getSources(files);
const validSources = sources.filter(source => source.url);

// respect 'imports.openaddresses.missingFilesAreFatal' setting
if (errorsFatal && (sources.length !== validSources.length)) {
callback(sources.find(source => source.error)); // return first error
return;
}

logger.info(`Attempting to download selected data sources: ${sources.map(source => source.id)}`);

// limit requests to avoid being banned by openaddresses.io
// current policy is 10 request per minute
// https://github.com/pelias/openaddresses/issues/433#issuecomment-527383976
// @todo: contact OA team to check if this is still required with the batch. endpoint?
const options = {
maxConcurrent: 1,
minTime: 6000
};
const limiter = new Bottleneck(options);
const callbackOnLastOne = () => {
const done = () => {
if (limiter.empty()) {
callback();
}
};
files.map(file => {
limiter.submit(downloadSource, targetDir, file, callbackOnLastOne);
validSources.map(source => {
limiter.submit(downloadSource, targetDir, source, done);
});
process.on('SIGINT', () => {
limiter.stop({dropWaitingJobs: true});
limiter.stop({ dropWaitingJobs: true });
process.exit();
});
});

}

function getFiles(config, targetDir, main_callback){
const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');
const files = config.imports.openaddresses.files;
files.forEach(file => {
// sources MUST end with '.csv'
if( !file.endsWith('.csv') ){
const msg = `invalid source '${file}': MUST end with '.csv'`;
logger.warn(msg);

// respect 'imports.openaddresses.missingFilesAreFatal' setting
return main_callback(errorsFatal ? msg : null);
async function getSources(files) {
return await Promise.all(files.map(async file => {

// normalize source
let id = OpenAddressesAPI.normalize(file);

// lookup the source using the OpenAddresses API
// to find the most current job id and ensure validity
const version = await oa.lookup(id);
const valid = (version && version.job);

// invalid source
if (!valid) {
return { id, error: `invalid source '${file}'` };
}
});
return files.map(file => {
const source = file.replace('.csv', '.zip');
const name = file.replace('.csv', '').replace(/\//g,'-');
return {
csv: file,
url: `https://results.openaddresses.io/latest/run/${source}`,
zip: temp.path({prefix: name, dir: targetDir, suffix: '.zip'})
};
});

// valid source
return { id, url: OpenAddressesAPI.url(version.job) };
}));
}

function downloadSource(targetDir, file, main_callback) {
function downloadSource(targetDir, source, done) {

const errorsFatal = config.get('imports.openaddresses.missingFilesAreFatal');
const token = config.get('imports.openaddresses.token');
const referer = config.get('imports.openaddresses.dataReferer') || 'https://pelias-results.openaddresses.io';
logger.info(`Downloading ${file.csv}`);
logger.info(`Downloading ${source.id}`);

const outFile = path.join(targetDir, `${source.id}.geojson`);
const tmpFile = temp.path({
prefix: source.id.replace(new RegExp(path.sep, 'g'), '-'),
dir: targetDir,
suffix: '.gz'
});

async.series(
[
// download the zip file into the temp directory
// download the compressed file into the temp directory
(callback) => {
logger.debug(`downloading ${file.url}`);
logger.debug(`downloading ${source.url}`);
const flags = [
'--request GET', // HTTP GET
'--silent', // be quiet
'--location', // follow redirects
'--fail', // exit with a non-zero code for >=400 responses
'--write-out "%{http_code}"', // print status code to STDOUT
`--referer ${referer}`, // set referer header
`--output ${file.zip}`, // set output filepath
`--output ${tmpFile}`, // set output filepath
'--retry 5', // retry this number of times before giving up
'--retry-connrefused', // consider ECONNREFUSED as a transient error
'--retry-delay 5' // sleep this many seconds between retry attempts
'--retry-delay 5', // sleep this many seconds between retry attempts
`-H 'Authorization: Bearer ${token}'` // authorization token
].join(' ');

// the `--fail*` flags cause an error to be returned as the first arg with `error.code`
// as the process exit status, the `-w "%{http_code}"` flag writes the HTTP status to STDOUT.
child_process.exec(`curl ${flags} ${file.url}`, (error, stdout) => {
child_process.exec(`curl ${flags} ${source.url}`, (error, stdout) => {
if (!error) { return callback(); }

// provide a more user-friendly error message
error.message = `cURL request failed, HTTP status: ${stdout}, exit code: ${error.code}`;
callback(error);
});
},
// unzip file into target directory
// decompress file into target directory
(callback) => {
logger.debug(`unzipping ${file.zip} to ${targetDir}`);
child_process.exec(`unzip -o -qq -d ${targetDir} ${file.zip}`, callback);
logger.debug(`decompress ${tmpFile} to ${outFile}`);
child_process.exec(`
mkdir -p ${path.dirname(outFile)};
gzip -d < ${tmpFile} > ${outFile};
`, (error, stdout) => {
if (!error) { return callback(); }

// provide a more user-friendly error message
error.message = `decompress failed, ${stdout}`;
callback(error);
});
},
// delete the temp downloaded zip file
fs.remove.bind(null, file.zip)
],
function(err) {
(err) => {
if (err) {
logger.warn(`failed to download ${file.url}: ${err}`);
logger.warn(`failed to download ${source.url}: ${err}`);
}

// ensure temp files are cleaned up
if (fs.existsSync(tmpFile)) { fs.unlinkSync(tmpFile); }

// honour 'imports.openaddresses.missingFilesAreFatal' setting
main_callback(errorsFatal ? err : null);
});
done(errorsFatal ? err : null);
}
);
}

module.exports = downloadFiltered;