Skip to content

Commit

Permalink
Merge pull request #8439 from nadavMiz/warp-put-head-fix1
Browse files Browse the repository at this point in the history
NSFS | versioning | fix GET/HEAD object concurrency issues
  • Loading branch information
nadavMiz authored Oct 8, 2024
2 parents 4d97f3d + 2c590cb commit 4c41484
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 35 deletions.
108 changes: 78 additions & 30 deletions src/sdk/namespace_fs.js
Original file line number Diff line number Diff line change
Expand Up @@ -931,22 +931,38 @@ class NamespaceFS {
async read_object_md(params, object_sdk) {
const fs_context = this.prepare_fs_context(object_sdk);
let file_path;
let stat;
let isDir;
let retries = (this._is_versioning_enabled() || this._is_versioning_suspended()) ? config.NSFS_RENAME_RETRIES : 0;
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
try {
file_path = await this._find_version_path(fs_context, params, true);
await this._check_path_in_bucket_boundaries(fs_context, file_path);
await this._load_bucket(params, fs_context);
let stat = await nb_native().fs.stat(fs_context, file_path);

const isDir = native_fs_utils.isDirectory(stat);
if (isDir) {
if (!stat.xattr?.[XATTR_DIR_CONTENT] || !params.key.endsWith('/')) {
throw error_utils.new_error_code('ENOENT', 'NoSuchKey');
for (;;) {
try {
file_path = await this._find_version_path(fs_context, params, true);
await this._check_path_in_bucket_boundaries(fs_context, file_path);
await this._load_bucket(params, fs_context);
stat = await nb_native().fs.stat(fs_context, file_path);
isDir = native_fs_utils.isDirectory(stat);
if (isDir) {
if (!stat.xattr?.[XATTR_DIR_CONTENT] || !params.key.endsWith('/')) {
throw error_utils.new_error_code('ENOENT', 'NoSuchKey');
} else if (stat.xattr?.[XATTR_DIR_CONTENT] !== '0') {
// find dir object content file path and return its stat + xattr of its parent directory
const dir_content_path = await this._find_version_path(fs_context, params);
const dir_content_path_stat = await nb_native().fs.stat(fs_context, dir_content_path);
const xattr = stat.xattr;
stat = { ...dir_content_path_stat, xattr };
}
}
if (this._is_mismatch_version_id(stat, params.version_id)) {
dbg.warn('NamespaceFS.read_object_md mismatch version_id', file_path, params.version_id, this._get_version_id_by_xattr(stat));
throw error_utils.new_error_code('MISMATCH_VERSION', 'file version does not match the version we asked for');
}
break;
} catch (err) {
dbg.warn(`NamespaceFS.read_object_md: retrying retries=${retries} file_path=${file_path}`, err);
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) throw err;
}
}
this._throw_if_delete_marker(stat, params);
Expand All @@ -959,42 +975,70 @@ class NamespaceFS {
}
}

async _is_empty_directory_content(file_path, fs_context, params) {
const is_dir_content = this._is_directory_content(file_path, params.key);
if (is_dir_content) {
try {
const md_path = this._get_file_md_path(params);
const dir_stat = await nb_native().fs.stat(fs_context, md_path);
if (dir_stat && dir_stat.xattr[XATTR_DIR_CONTENT] === '0') return true;
} catch (err) {
//failed to get object
new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key,
{bucket_path: this.bucket_path, object_name: params.key}, err);
dbg.log0('NamespaceFS: read_object_stream couldnt find dir content xattr', err);
}
}
return false;
}

// eslint-disable-next-line max-statements
async read_object_stream(params, object_sdk, res) {
let file;
let buffer_pool_cleanup = null;
const fs_context = this.prepare_fs_context(object_sdk);
let file_path;
let file;
try {
await this._load_bucket(params, fs_context);
let retries = (this._is_versioning_enabled() || this._is_versioning_suspended()) ? config.NSFS_RENAME_RETRIES : 0;
const is_gpfs = native_fs_utils._is_gpfs(fs_context);
let stat;
for (;;) {
try {
file_path = await this._find_version_path(fs_context, params);
await this._check_path_in_bucket_boundaries(fs_context, file_path);

// NOTE: don't move this code after the open
// this can lead to ENOENT failures due to file not exists when content size is 0
// if entry is a directory object and its content size = 0 - return empty response
const is_dir_content = this._is_directory_content(file_path, params.key);
if (is_dir_content) {
try {
const md_path = this._get_file_md_path(params);
const dir_stat = await nb_native().fs.stat(fs_context, md_path);
if (dir_stat && dir_stat.xattr[XATTR_DIR_CONTENT] === '0') return null;
} catch (err) {
//failed to get object
new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key,
{bucket_path: this.bucket_path, object_name: params.key}, err);
dbg.log0('NamespaceFS: read_object_stream couldnt find dir content xattr', err);
}
}
if (await this._is_empty_directory_content(file_path, fs_context, params)) return null;

file = await nb_native().fs.open(
fs_context,
file_path,
config.NSFS_OPEN_READ_MODE,
native_fs_utils.get_umasked_mode(config.BASE_MODE_FILE),
);

const stat = await file.stat(fs_context);
stat = await file.stat(fs_context);
if (this._is_mismatch_version_id(stat, params.version_id)) {
dbg.warn('NamespaceFS.read_object_stream mismatch version_id', params.version_id, this._get_version_id_by_xattr(stat));
throw error_utils.new_error_code('MISMATCH_VERSION', 'file version does not match the version we asked for');
}
break;
} catch (err) {
dbg.warn(`NamespaceFS.read_object_stream: retrying retries=${retries} file_path=${file_path}`, err);
if (file) {
await file.close(fs_context);
file = null;
}
retries -= 1;
if (retries <= 0 || !native_fs_utils.should_retry_link_unlink(is_gpfs, err)) {
new NoobaaEvent(NoobaaEvent.OBJECT_GET_FAILED).create_event(params.key,
{bucket_path: this.bucket_path, object_name: params.key}, err);
throw err;
}
}
}
this._throw_if_delete_marker(stat, params);
// await this._fail_if_archived_or_sparse_file(fs_context, file_path, stat);

Expand Down Expand Up @@ -2755,11 +2799,15 @@ class NamespaceFS {
}
}

_is_mismatch_version_id(stat, version_id) {
return version_id && !this._is_versioning_disabled() && this._get_version_id_by_xattr(stat) !== version_id;
}

/**
* _delete_single_object_versioned does the following -
* _delete_single_object_versioned does the following -
* if the deleted version is the latest - try to delete it from the latest version location
* if the deleted version is in .versions/ - unlink the version
* we call check_version_moved() in case of concurrent puts, the version might move to .versions/
* we call check_version_moved() in case of concurrent puts, the version might move to .versions/
* if the version moved we will retry
* @param {nb.NativeFSContext} fs_context
* @param {string} key
Expand Down Expand Up @@ -2964,8 +3012,8 @@ class NamespaceFS {
const bucket_tmp_dir_path = this.get_bucket_tmpdir_full_path();
if (this._is_versioning_enabled() || suspended_and_latest_is_not_null) {
await native_fs_utils._make_path_dirs(versioned_path, fs_context);
await native_fs_utils.safe_move(fs_context, latest_ver_path, versioned_path, latest_ver_info,
gpfs_options?.delete_version, bucket_tmp_dir_path);
await native_fs_utils.safe_move_posix(fs_context, latest_ver_path, versioned_path, latest_ver_info,
bucket_tmp_dir_path);
if (suspended_and_latest_is_not_null) {
// remove a version (or delete marker) with null version ID from .versions/ (if exists)
await this._delete_null_version_from_versions_directory(params.key, fs_context);
Expand Down Expand Up @@ -3136,7 +3184,7 @@ class NamespaceFS {
dst_file = await native_fs_utils.open_file(fs_context, this.bucket_path, dst_path, 'r');
}
return {
move_to_versions: { src_file: dst_file, dir_file, dst_file: versioned_file },
move_to_versions: { src_file: dst_file, dir_file, should_override: false },
move_to_dst: { src_file, dst_file, dir_file, versioned_file }
};
} catch (err) {
Expand Down
15 changes: 10 additions & 5 deletions src/util/native_fs_utils.js
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ function _is_gpfs(fs_context) {

async function safe_move(fs_context, src_path, dst_path, src_ver_info, gpfs_options, tmp_dir_path) {
if (_is_gpfs(fs_context)) {
await safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options);
await safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options, src_ver_info);
} else {
await safe_move_posix(fs_context, src_path, dst_path, src_ver_info, tmp_dir_path);
}
Expand Down Expand Up @@ -199,10 +199,15 @@ async function safe_move_posix(fs_context, src_path, dst_path, src_ver_info, tmp
// safe_link_posix links src_path to dst_path while verifing dst_path has the expected ino and mtimeNsBigint values
// src_file exists on uploads (open mode = 'w' ) or deletions
// on uploads (open mode 'wt') the dir_file is used as the link source
async function safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options) {
const { src_file = undefined, dst_file = undefined, dir_file = undefined, should_unlink = false } = gpfs_options;
async function safe_move_gpfs(fs_context, src_path, dst_path, gpfs_options, src_ver_info) {
const { src_file = undefined, dst_file = undefined, dir_file = undefined, should_unlink = false,
should_override = true } = gpfs_options;
dbg.log1('Namespace_fs.safe_move_gpfs', src_path, dst_path, dst_file, should_unlink);
await safe_link_gpfs(fs_context, dst_path, src_file || dir_file, dst_file);
if (should_override) {
await safe_link_gpfs(fs_context, dst_path, src_file || dir_file, dst_file);
} else {
await safe_link_posix(fs_context, src_path, dst_path, src_ver_info);
}
if (should_unlink) await safe_unlink_gpfs(fs_context, src_path, src_file, dir_file);
}

Expand Down Expand Up @@ -268,7 +273,7 @@ async function safe_unlink_gpfs(fs_context, to_delete_path, to_delete_file, dir_
}

function should_retry_link_unlink(is_gpfs, err) {
const should_retry_general = ['ENOENT', 'EEXIST', 'VERSION_MOVED'].includes(err.code);
const should_retry_general = ['ENOENT', 'EEXIST', 'VERSION_MOVED', 'MISMATCH_VERSION'].includes(err.code);
const should_retry_gpfs = [gpfs_link_unlink_retry_err, gpfs_unlink_retry_catch].includes(err.code);
const should_retry_posix = [posix_link_retry_err, posix_unlink_retry_err].includes(err.message);
return should_retry_general || (is_gpfs ? should_retry_gpfs : should_retry_posix);
Expand Down

0 comments on commit 4c41484

Please sign in to comment.