diff --git a/src/endpoint/endpoint.js b/src/endpoint/endpoint.js index 3025bc05f1..c4c817500e 100755 --- a/src/endpoint/endpoint.js +++ b/src/endpoint/endpoint.js @@ -118,7 +118,17 @@ async function main(options = {}) { // the primary just forks and returns, workers will continue to serve fork_count = options.forks ?? config.ENDPOINT_FORKS; const metrics_port = options.metrics_port || config.EP_METRICS_SERVER_PORT; - if (fork_utils.start_workers(metrics_port, fork_count)) return; + /** + * Please notice that we can run the main in 2 states: + * 1. Only the primary process runs the main (fork is 0 or undefined) - everything that + * is implemented here would be run by this process. + * 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that + * in only relevant to the primary process it should be implemented in + * fork_utils.start_workers because the primary process returns after start_workers + * and the forks will continue executing the code lines in this function + * */ + const is_workers_started_from_primary = await fork_utils.start_workers(metrics_port, fork_count); + if (is_workers_started_from_primary) return; const endpoint_group_id = process.env.ENDPOINT_GROUP_ID || 'default-endpoint-group'; diff --git a/src/manage_nsfs/diagnose.js b/src/manage_nsfs/diagnose.js index 56d65266d2..2e87015b31 100644 --- a/src/manage_nsfs/diagnose.js +++ b/src/manage_nsfs/diagnose.js @@ -59,9 +59,17 @@ async function gather_metrics() { const buffer = await buffer_utils.read_stream_join(res); const body = buffer.toString('utf8'); metrics_output = JSON.parse(body); + if (!metrics_output) throw new Error('received empty metrics response', { cause: res.statusCode }); + write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output); + } else if (res.statusCode >= 500 && res.rawHeaders.includes('application/json')) { + const buffer = await buffer_utils.read_stream_join(res); + const body = buffer.toString('utf8'); + const error_output = JSON.parse(body); + if (!error_output) throw new Error('received empty metrics response', { cause: res.statusCode }); + throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, ...error_output }); + } else { + throw new Error('received empty metrics response', { cause: res.statusCode }); } - if (!metrics_output) throw new Error('recieved empty metrics response', { cause: res.statusCode }); - write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output); } catch (err) { dbg.warn('could not receive metrics response', err); throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, cause: err?.errors?.[0] || err }); diff --git a/src/server/analytic_services/prometheus_reporting.js b/src/server/analytic_services/prometheus_reporting.js index 164e4d502b..caee6bf86b 100644 --- a/src/server/analytic_services/prometheus_reporting.js +++ b/src/server/analytic_services/prometheus_reporting.js @@ -61,12 +61,7 @@ async function start_server( const server = http.createServer(async (req, res) => { // Serve all metrics on the root path for system that do have one or more fork running. if (fork_enabled) { - const metrics = await aggregatorRegistry.clusterMetrics(); - if (req.url === '' || req.url === '/') { - res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType }); - res.end(metrics); - return; - } + // we would like this part to be first as clusterMetrics might fail. if (req.url === '/metrics/nsfs_stats') { res.writeHead(200, { 'Content-Type': 'text/plain' }); const nsfs_report = { @@ -77,6 +72,24 @@ async function start_server( res.end(JSON.stringify(nsfs_report)); return; } + let metrics; + try { + metrics = await aggregatorRegistry.clusterMetrics(); + } catch (err) { + dbg.error('start_server: Could not get the metrics, got an error', err); + res.writeHead(504, { 'Content-Type': 'application/json' }); + const reply = JSON.stringify({ + error: 'Internal server error - timeout', + message: 'Looks like the server is taking a long time to respond (Could not get the metrics)', + }); + res.end(reply); + return; + } + if (req.url === '' || req.url === '/') { + res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType }); + res.end(metrics); + return; + } // Serve report's metrics on the report name path const report_name = req.url.substr(1); const single_metrics = export_single_metrics(metrics, report_name); @@ -165,7 +178,7 @@ async function metrics_nsfs_stats_handler() { op_stats_counters: op_stats_counters, fs_worker_stats_counters: fs_worker_stats_counters }; - dbg.log1(`_create_nsfs_report: nsfs_report ${nsfs_report}`); + dbg.log1('_create_nsfs_report: nsfs_report', nsfs_report); return JSON.stringify(nsfs_report); } diff --git a/src/util/fork_utils.js b/src/util/fork_utils.js index 5b6e6378e8..52c00d7b60 100644 --- a/src/util/fork_utils.js +++ b/src/util/fork_utils.js @@ -30,9 +30,9 @@ const fs_workers_stats = {}; * * @param {number?} count number of workers to start. * @param {number?} metrics_port prometheus metris port. - * @returns {boolean} true if workers were started. + * @returns {Promise} true if workers were started. */ -function start_workers(metrics_port, count = 0) { +async function start_workers(metrics_port, count = 0) { const exit_events = []; if (cluster.isPrimary && count > 0) { for (let i = 0; i < count; ++i) { @@ -68,12 +68,12 @@ function start_workers(metrics_port, count = 0) { }); for (const id in cluster.workers) { if (id) { - cluster.workers[id].on('message', nsfs_io_state_handler); + cluster.workers[id].on('message', nsfs_io_stats_handler); } } if (metrics_port > 0) { dbg.log0('Starting metrics server', metrics_port); - prom_reporting.start_server(metrics_port, true); + await prom_reporting.start_server(metrics_port, true); dbg.log0('Started metrics server successfully'); } return true; @@ -82,7 +82,7 @@ function start_workers(metrics_port, count = 0) { return false; } -function nsfs_io_state_handler(msg) { +function nsfs_io_stats_handler(msg) { if (msg.io_stats) { for (const [key, value] of Object.entries(msg.io_stats)) { io_stats[key] += value;