Skip to content

Commit

Permalink
fix(Bulkload): Avoid use reference of _metadata.files in
Browse files Browse the repository at this point in the history
download_sst_file (#2006)

    replica_bulk_loader::clear_bulk_load_states function cannot cancel already
downloading sst task, which access `_metadata.files` references.
But clear_bulk_load_states function will clear `_metadata.files`. It's
cause core dump. I use a copy of `_metadata.files` to solve this
problem.
  • Loading branch information
lupengfan1 committed Aug 21, 2024
1 parent 2b90137 commit 5469727
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 37 deletions.
65 changes: 29 additions & 36 deletions src/replica/bulk_load/replica_bulk_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -519,41 +519,39 @@ void replica_bulk_loader::download_files(const std::string &provider_name,
}

// download sst files asynchronously
if (!_metadata.files.empty()) {
const file_meta &f_meta = _metadata.files[0];
_download_files_task[f_meta.name] = tasking::enqueue(
LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(&replica_bulk_loader::download_sst_file, this, remote_dir, local_dir, 0, fs));
std::vector<::dsn::replication::file_meta> download_file_metas;
{
zauto_read_lock l(_lock);
std::copy(_metadata.files.begin(),
_metadata.files.end(),
std::back_inserter(download_file_metas));
}
if (!download_file_metas.empty()) {
_download_files_task[download_file_metas.back().name] =
tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(&replica_bulk_loader::download_sst_file,
this,
remote_dir,
local_dir,
download_file_metas,
fs));
}
}

// ThreadPool: THREAD_POOL_DEFAULT
void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
const std::string &local_dir,
int32_t file_index,
dist::block_service::block_filesystem *fs)
void replica_bulk_loader::download_sst_file(
const std::string &remote_dir,
const std::string &local_dir,
std::vector<::dsn::replication::file_meta> &download_file_metas,
dist::block_service::block_filesystem *fs)
{
if (_status != bulk_load_status::BLS_DOWNLOADING) {
LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}. "
"local_dir: {} , file_index is {}.",
enum_to_string(_status),
local_dir,
file_index);
return;
}
file_meta f_meta;
{
zauto_read_lock l(_lock);
if (file_index < _metadata.files.size()) {
f_meta = _metadata.files[file_index];
}
}
if (f_meta.name.empty()) {
LOG_WARNING_PREFIX("Cannot get file_meta of {}, cancel download_sst_file task.",
file_index);
LOG_WARNING_PREFIX("Cancel download_sst_file task, because bulk_load local_status is {}.",
enum_to_string(_status));
return;
}
const file_meta &f_meta = download_file_metas.back();
uint64_t f_size = 0;
std::string f_md5;
error_code ec = _stub->_block_service_manager.download_file(
Expand Down Expand Up @@ -604,26 +602,21 @@ void replica_bulk_loader::download_sst_file(const std::string &remote_dir,
return;
}
// download file succeed, update progress
download_file_metas.pop_back();
update_bulk_load_download_progress(f_size, f_meta.name);
METRIC_VAR_INCREMENT(bulk_load_download_file_successful_count);
METRIC_VAR_INCREMENT_BY(bulk_load_download_file_bytes, f_size);

// download next file
{
zauto_read_lock l(_lock);
if (file_index + 1 < _metadata.files.size()) {
f_meta = _metadata.files[file_index + 1];
}
}
if (!f_meta.name.empty()) {
_download_files_task[f_meta.name] =
if (!download_file_metas.empty()) {
_download_files_task[download_file_metas.back().name] =
tasking::enqueue(LPC_BACKGROUND_BULK_LOAD,
tracker(),
std::bind(&replica_bulk_loader::download_sst_file,
this,
remote_dir,
local_dir,
file_index + 1,
download_file_metas,
fs));
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/replica/bulk_load/replica_bulk_loader.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ class replica_bulk_loader : replica_base
// download sst files from remote provider
void download_sst_file(const std::string &remote_dir,
const std::string &local_dir,
int32_t file_index,
std::vector<::dsn::replication::file_meta> &download_file_metas,
dist::block_service::block_filesystem *fs);

// \return ERR_PATH_NOT_FOUND: file not exist
Expand Down

0 comments on commit 5469727

Please sign in to comment.