Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backports to stage_5_17_2 #8591

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions config.js
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ config.NSFS_LOW_FREE_SPACE_PERCENT_UNLEASH = 0.10;
// anonymous account name
config.ANONYMOUS_ACCOUNT_NAME = 'anonymous';

config.NFSF_UPLOAD_STREAM_MEM_THRESHOLD = 8 * 1024 * 1024;

////////////////////////////
// NSFS NON CONTAINERIZED //
////////////////////////////
Expand Down
12 changes: 11 additions & 1 deletion src/endpoint/endpoint.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,17 @@ async function main(options = {}) {
// the primary just forks and returns, workers will continue to serve
fork_count = options.forks ?? config.ENDPOINT_FORKS;
const metrics_port = options.metrics_port || config.EP_METRICS_SERVER_PORT;
if (fork_utils.start_workers(metrics_port, fork_count)) return;
/**
* Please notice that we can run the main in 2 states:
* 1. Only the primary process runs the main (fork is 0 or undefined) - everything that
* is implemented here would be run by this process.
* 2. A primary process with multiple forks (IMPORTANT) - if there is implementation that
* in only relevant to the primary process it should be implemented in
* fork_utils.start_workers because the primary process returns after start_workers
* and the forks will continue executing the code lines in this function
* */
const is_workers_started_from_primary = await fork_utils.start_workers(metrics_port, fork_count);
if (is_workers_started_from_primary) return;

const http_port = options.http_port || config.ENDPOINT_PORT;
const https_port = options.https_port || config.ENDPOINT_SSL_PORT;
Expand Down
12 changes: 10 additions & 2 deletions src/manage_nsfs/diagnose.js
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,17 @@ async function gather_metrics() {
const buffer = await buffer_utils.read_stream_join(res);
const body = buffer.toString('utf8');
metrics_output = JSON.parse(body);
if (!metrics_output) throw new Error('received empty metrics response', { cause: res.statusCode });
write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output);
} else if (res.statusCode >= 500 && res.rawHeaders.includes('application/json')) {
const buffer = await buffer_utils.read_stream_join(res);
const body = buffer.toString('utf8');
const error_output = JSON.parse(body);
if (!error_output) throw new Error('received empty metrics response', { cause: res.statusCode });
throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, ...error_output });
} else {
throw new Error('received empty metrics response', { cause: res.statusCode });
}
if (!metrics_output) throw new Error('recieved empty metrics response', { cause: res.statusCode });
write_stdout_response(ManageCLIResponse.MetricsStatus, metrics_output);
} catch (err) {
dbg.warn('could not receive metrics response', err);
throw_cli_error({ ...ManageCLIError.MetricsStatusFailed, cause: err?.errors?.[0] || err });
Expand Down
29 changes: 16 additions & 13 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ const stream_utils = require('../util/stream_utils');
const buffer_utils = require('../util/buffer_utils');
const size_utils = require('../util/size_utils');
const native_fs_utils = require('../util/native_fs_utils');
const ChunkFS = require('../util/chunk_fs');
const FileWriter = require('../util/file_writer');
const LRUCache = require('../util/lru_cache');
const nb_native = require('../util/nb_native');
const RpcError = require('../rpc/rpc_error');
Expand Down Expand Up @@ -1563,30 +1563,33 @@ class NamespaceFS {
// Can be finetuned further on if needed and inserting the Semaphore logic inside
// Instead of wrapping the whole _upload_stream function (q_buffers lives outside of the data scope of the stream)
async _upload_stream({ fs_context, params, target_file, object_sdk, offset }) {
const { source_stream, copy_source } = params;
const { copy_source } = params;
try {
// Not using async iterators with ReadableStreams due to unsettled promises issues on abort/destroy
const md5_enabled = this._is_force_md5_enabled(object_sdk);
const chunk_fs = new ChunkFS({
const file_writer = new FileWriter({
target_file,
fs_context,
stats: this.stats,
namespace_resource_id: this.namespace_resource_id,
md5_enabled,
offset,
md5_enabled,
stats: this.stats,
bucket: params.bucket,
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size
large_buf_size: multi_buffer_pool.get_buffers_pool(undefined).buf_size,
namespace_resource_id: this.namespace_resource_id,
});
chunk_fs.on('error', err1 => dbg.error('namespace_fs._upload_stream: error occured on stream ChunkFS: ', err1));
file_writer.on('error', err => dbg.error('namespace_fs._upload_stream: error occured on FileWriter: ', err));
file_writer.on('finish', arg => dbg.log1('namespace_fs._upload_stream: finish occured on stream FileWriter: ', arg));
file_writer.on('close', arg => dbg.log1('namespace_fs._upload_stream: close occured on stream FileWriter: ', arg));

if (copy_source) {
await this.read_object_stream(copy_source, object_sdk, chunk_fs);
await this.read_object_stream(copy_source, object_sdk, file_writer);
} else if (params.source_params) {
await params.source_ns.read_object_stream(params.source_params, object_sdk, chunk_fs);
await params.source_ns.read_object_stream(params.source_params, object_sdk, file_writer);
} else {
await stream_utils.pipeline([source_stream, chunk_fs]);
await stream_utils.wait_finished(chunk_fs);
await stream_utils.pipeline([params.source_stream, file_writer]);
await stream_utils.wait_finished(file_writer);
}
return { digest: chunk_fs.digest, total_bytes: chunk_fs.total_bytes };
return { digest: file_writer.digest, total_bytes: file_writer.total_bytes };
} catch (error) {
dbg.error('_upload_stream had error: ', error);
throw error;
Expand Down
27 changes: 20 additions & 7 deletions src/server/analytic_services/prometheus_reporting.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,12 +61,7 @@ async function start_server(
const server = http.createServer(async (req, res) => {
// Serve all metrics on the root path for system that do have one or more fork running.
if (fork_enabled) {
const metrics = await aggregatorRegistry.clusterMetrics();
if (req.url === '' || req.url === '/') {
res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType });
res.end(metrics);
return;
}
// we would like this part to be first as clusterMetrics might fail.
if (req.url === '/metrics/nsfs_stats') {
res.writeHead(200, { 'Content-Type': 'text/plain' });
const nsfs_report = {
Expand All @@ -77,6 +72,24 @@ async function start_server(
res.end(JSON.stringify(nsfs_report));
return;
}
let metrics;
try {
metrics = await aggregatorRegistry.clusterMetrics();
} catch (err) {
dbg.error('start_server: Could not get the metrics, got an error', err);
res.writeHead(504, { 'Content-Type': 'application/json' });
const reply = JSON.stringify({
error: 'Internal server error - timeout',
message: 'Looks like the server is taking a long time to respond (Could not get the metrics)',
});
res.end(reply);
return;
}
if (req.url === '' || req.url === '/') {
res.writeHead(200, { 'Content-Type': aggregatorRegistry.contentType });
res.end(metrics);
return;
}
// Serve report's metrics on the report name path
const report_name = req.url.substr(1);
const single_metrics = export_single_metrics(metrics, report_name);
Expand Down Expand Up @@ -165,7 +178,7 @@ async function metrics_nsfs_stats_handler() {
op_stats_counters: op_stats_counters,
fs_worker_stats_counters: fs_worker_stats_counters
};
dbg.log1(`_create_nsfs_report: nsfs_report ${nsfs_report}`);
dbg.log1('_create_nsfs_report: nsfs_report', nsfs_report);
return JSON.stringify(nsfs_report);
}

Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ require('./test_bucket_chunks_builder');
require('./test_mirror_writer');
require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_s3select');
Expand Down
2 changes: 1 addition & 1 deletion src/test/unit_tests/nc_index.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ coretest.setup();

require('./test_namespace_fs');
require('./test_ns_list_objects');
require('./test_chunk_fs');
require('./test_file_writer');
require('./test_namespace_fs_mpu');
require('./test_nb_native_fs');
require('./test_nc_nsfs_cli');
Expand Down
34 changes: 0 additions & 34 deletions src/test/unit_tests/test_chunk_fs.js

This file was deleted.

69 changes: 69 additions & 0 deletions src/test/unit_tests/test_file_writer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/* Copyright (C) 2020 NooBaa */
/* eslint-disable no-invalid-this */
'use strict';

const mocha = require('mocha');
const config = require('../../../config');
const file_writer_hashing = require('../../tools/file_writer_hashing');
const orig_iov_max = config.NSFS_DEFAULT_IOV_MAX;

// on iov_max small tests we need to use smaller amount of parts and chunks to ensure that the test will finish
// in a reasonable period of time because we will flush max 1/2 buffers at a time.
const small_iov_num_parts = 20;


mocha.describe('FileWriter', function() {
const RUN_TIMEOUT = 10 * 60 * 1000;

mocha.afterEach(function() {
config.NSFS_DEFAULT_IOV_MAX = orig_iov_max;
});

mocha.it('Concurrent FileWriter with hash target', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.hash_target();
});

mocha.it('Concurrent FileWriter with file target', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.file_target();
});

mocha.it('Concurrent FileWriter with hash target - iov_max=1', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 1);
});

mocha.it('Concurrent FileWriter with file target - iov_max=1', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.file_target(undefined, small_iov_num_parts, 1);
});

mocha.it('Concurrent FileWriter with hash target - iov_max=2', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.hash_target(undefined, small_iov_num_parts, 2);
});

mocha.it('Concurrent FileWriter with file target - iov_max=2', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
await file_writer_hashing.file_target(undefined, small_iov_num_parts, 2);
});

mocha.it('Concurrent FileWriter with file target - produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L', async function() {
const self = this;
self.timeout(RUN_TIMEOUT);
// The goal of this test is to produce num_chunks > 1024 && total_chunks_size < config.NSFS_BUF_SIZE_L
// so we will flush buffers because of reaching max num of buffers and not because we reached the max NSFS buf size
// chunk size = 100, num_chunks = (10 * 1024 * 1024)/100 < 104587, 104587 = num_chunks > 1024
// chunk size = 100, total_chunks_size after having 1024 chunks is = 100 * 1024 < config.NSFS_BUF_SIZE_L
const chunk_size = 100;
const parts_s = 50;
await file_writer_hashing.file_target(chunk_size, parts_s);
});
});
Loading
Loading