diff --git a/src/sdk/namespace_fs.js b/src/sdk/namespace_fs.js index bdde9b56d1..e4b57ba773 100644 --- a/src/sdk/namespace_fs.js +++ b/src/sdk/namespace_fs.js @@ -931,22 +931,38 @@ class NamespaceFS { async read_object_md(params, object_sdk) { const fs_context = this.prepare_fs_context(object_sdk); let file_path; + let stat; + let isDir; + let retries = (this._is_versioning_enabled() || this._is_versioning_suspended()) ? config.NSFS_RENAME_RETRIES : 0; + const is_gpfs = native_fs_utils._is_gpfs(fs_context); try { - file_path = await this._find_version_path(fs_context, params, true); - await this._check_path_in_bucket_boundaries(fs_context, file_path); - await this._load_bucket(params, fs_context); - let stat = await nb_native().fs.stat(fs_context, file_path); - - const isDir = native_fs_utils.isDirectory(stat); - if (isDir) { - if (!stat.xattr?.[XATTR_DIR_CONTENT] || !params.key.endsWith('/')) { - throw error_utils.new_error_code('ENOENT', 'NoSuchKey'); + for (;;) { + try { + file_path = await this._find_version_path(fs_context, params, true); + await this._check_path_in_bucket_boundaries(fs_context, file_path); + await this._load_bucket(params, fs_context); + stat = await nb_native().fs.stat(fs_context, file_path); + isDir = native_fs_utils.isDirectory(stat); + if (isDir) { + if (!stat.xattr?.[XATTR_DIR_CONTENT] || !params.key.endsWith('/')) { + throw error_utils.new_error_code('ENOENT', 'NoSuchKey'); } else if (stat.xattr?.[XATTR_DIR_CONTENT] !== '0') { // find dir object content file path and return its stat + xattr of its parent directory const dir_content_path = await this._find_version_path(fs_context, params); const dir_content_path_stat = await nb_native().fs.stat(fs_context, dir_content_path); const xattr = stat.xattr; stat = { ...dir_content_path_stat, xattr }; + } + } + if (this._is_mismatch_version_id(stat, params.version_id)) { + dbg.warn('NamespaceFS.read_object_md mismatch version_id', file_path, params.version_id, this._get_version_id_by_xattr(stat)); + throw error_utils.new_error_code('MISMATCH_VERSION', 'file version does not match the version we asked for'); + } + break; + } catch (err) { + dbg.warn(`NamespaceFS.read_object_md: retrying retries=${retries} file_path=${file_path}`, err); + retries -= 1; + if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err; } } this._throw_if_delete_marker(stat, params); @@ -959,33 +975,43 @@ class NamespaceFS { } } + async _is_empty_directory_content(file_path, fs_context, params) { + const is_dir_content = this._is_directory_content(file_path, params.key); + if (is_dir_content) { + try { + const md_path = this._get_file_md_path(params); + const dir_stat = await nb_native().fs.stat(fs_context, md_path); + if (dir_stat && dir_stat.xattr[XATTR_DIR_CONTENT] === '0') return true; + } catch (err) { + //failed to get object + new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key, + {bucket_path: this.bucket_path, object_name: params.key}, err); + dbg.log0('NamespaceFS: read_object_stream couldnt find dir content xattr', err); + } + } + return false; + } + // eslint-disable-next-line max-statements async read_object_stream(params, object_sdk, res) { - let file; let buffer_pool_cleanup = null; const fs_context = this.prepare_fs_context(object_sdk); let file_path; + let file; try { await this._load_bucket(params, fs_context); + let retries = (this._is_versioning_enabled() || this._is_versioning_suspended()) ? config.NSFS_RENAME_RETRIES : 0; + const is_gpfs = native_fs_utils._is_gpfs(fs_context); + let stat; + for (;;) { + try { file_path = await this._find_version_path(fs_context, params); await this._check_path_in_bucket_boundaries(fs_context, file_path); // NOTE: don't move this code after the open // this can lead to ENOENT failures due to file not exists when content size is 0 // if entry is a directory object and its content size = 0 - return empty response - const is_dir_content = this._is_directory_content(file_path, params.key); - if (is_dir_content) { - try { - const md_path = this._get_file_md_path(params); - const dir_stat = await nb_native().fs.stat(fs_context, md_path); - if (dir_stat && dir_stat.xattr[XATTR_DIR_CONTENT] === '0') return null; - } catch (err) { - //failed to get object - new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key, - {bucket_path: this.bucket_path, object_name: params.key}, err); - dbg.log0('NamespaceFS: read_object_stream couldnt find dir content xattr', err); - } - } + if (await this._is_empty_directory_content(file_path, fs_context, params)) return null; file = await nb_native().fs.open( fs_context, @@ -993,8 +1019,26 @@ class NamespaceFS { config.NSFS_OPEN_READ_MODE, native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE), ); - - const stat = await file.stat(fs_context); + stat = await file.stat(fs_context); + if (this._is_mismatch_version_id(stat, params.version_id)) { + dbg.warn('NamespaceFS.read_object_stream mismatch version_id', params.version_id, this._get_version_id_by_xattr(stat)); + throw error_utils.new_error_code('MISMATCH_VERSION', 'file version does not match the version we asked for'); + } + break; + } catch (err) { + dbg.warn(`NamespaceFS.read_object_stream: retrying retries=${retries} file_path=${file_path}`, err); + if (file) { + await file.close(fs_context); + file = null; + } + retries -= 1; + if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) { + new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key, + {bucket_path: this.bucket_path, object_name: params.key}, err); + throw err; + } + } + } this._throw_if_delete_marker(stat, params); // await this._fail_if_archived_or_sparse_file(fs_context, file_path, stat); @@ -2755,11 +2799,15 @@ class NamespaceFS { } } + _is_mismatch_version_id(stat, version_id) { + return version_id && !this._is_versioning_disabled() && this._get_version_id_by_xattr(stat) !== version_id; + } + /** - * _delete_single_object_versioned does the following - + * _delete_single_object_versioned does the following - * if the deleted version is the latest - try to delete it from the latest version location * if the deleted version is in .versions/ - unlink the version - * we call check_version_moved() in case of concurrent puts, the version might move to .versions/ + * we call check_version_moved() in case of concurrent puts, the version might move to .versions/ * if the version moved we will retry * @param {nb.NativeFSContext} fs_context * @param {string} key @@ -2964,8 +3012,8 @@ class NamespaceFS { const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path(); if (this._is_versioning_enabled() || suspended_and_latest_is_not_null) { await native_fs_utils._make_path_dirs(versioned_path, fs_context); - await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info, - gpfs_options?.delete_version, bucket_tmp_dir_path); + await native_fs_utils.safe_move_posix(fs_context, latest_ver_path, versioned_path, latest_ver_info, + bucket_tmp_dir_path); if (suspended_and_latest_is_not_null) { // remove a version (or delete marker) with null version ID from .versions/ (if exists) await this._delete_null_version_from_versions_directory(params.key, fs_context); @@ -3136,7 +3184,7 @@ class NamespaceFS { dst_file = await native_fs_utils.open_file(fs_context, this.bucket_path, dst_path, 'r'); } return { - move_to_versions: { src_file: dst_file, dir_file, dst_file: versioned_file }, + move_to_versions: { src_file: dst_file, dir_file, should_override: false }, move_to_dst: { src_file, dst_file, dir_file, versioned_file } }; } catch (err) { diff --git a/src/util/native_fs_utils.js b/src/util/native_fs_utils.js index bafb1a2039..8ef267d2b8 100644 --- a/src/util/native_fs_utils.js +++ b/src/util/native_fs_utils.js @@ -167,7 +167,7 @@ function _is_gpfs(fs_context) { async function safe_move(fs_context, src_path, dst_path, src_ver_info, gpfs_options, tmp_dir_path) { if (_is_gpfs(fs_context)) { - await safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options); + await safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options, src_ver_info); } else { await safe_move_posix(fs_context, src_path, dst_path, src_ver_info, tmp_dir_path); } @@ -199,10 +199,15 @@ async function safe_move_posix(fs_context, src_path, dst_path, src_ver_info, tmp // safe_link_posix links src_path to dst_path while verifing dst_path has the expected ino and mtimeNsBigint values // src_file exists on uploads (open mode = 'w' ) or deletions // on uploads (open mode 'wt') the dir_file is used as the link source -async function safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options) { - const { src_file = undefined, dst_file = undefined, dir_file = undefined, should_unlink = false } = gpfs_options; +async function safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options, src_ver_info) { + const { src_file = undefined, dst_file = undefined, dir_file = undefined, should_unlink = false, + should_override = true } = gpfs_options; dbg.log1('Namespace_fs.safe_move_gpfs', src_path, dst_path, dst_file, should_unlink); - await safe_link_gpfs(fs_context, dst_path, src_file || dir_file, dst_file); + if (should_override) { + await safe_link_gpfs(fs_context, dst_path, src_file || dir_file, dst_file); + } else { + await safe_link_posix(fs_context, src_path, dst_path, src_ver_info); + } if (should_unlink) await safe_unlink_gpfs(fs_context, src_path, src_file, dir_file); } @@ -268,7 +273,7 @@ async function safe_unlink_gpfs(fs_context, to_delete_path, to_delete_file, dir_ } function should_retry_link_unlink(is_gpfs, err) { - const should_retry_general = ['ENOENT', 'EEXIST', 'VERSION_MOVED'].includes(err.code); + const should_retry_general = ['ENOENT', 'EEXIST', 'VERSION_MOVED', 'MISMATCH_VERSION'].includes(err.code); const should_retry_gpfs = [gpfs_link_unlink_retry_err, gpfs_unlink_retry_catch].includes(err.code); const should_retry_posix = [posix_link_retry_err, posix_unlink_retry_err].includes(err.message); return should_retry_general || (is_gpfs ? should_retry_gpfs : should_retry_posix);