Skip to content

Commit

Permalink
[bug](s3) Fix object data is overwritten by empty object Fix wrong em…
Browse files Browse the repository at this point in the history
…pty s3 file (apache#32252)
  • Loading branch information
platoneko authored Mar 14, 2024
1 parent a18845b commit 8fdd809
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 11 deletions.
8 changes: 4 additions & 4 deletions be/src/io/fs/err_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ Status localfs_error(const std::error_code& ec, std::string_view msg) {
} else if (ec == std::errc::permission_denied) {
return Status::Error<PERMISSION_DENIED, false>(msg);
} else {
return Status::Error<doris::INTERNAL_ERROR, false>("{}: {}", msg, ec.message());
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("{}: {}", msg, ec.message());
}
}

Expand All @@ -106,8 +106,8 @@ Status localfs_error(int posix_errno, std::string_view msg) {
case EACCES:
return Status::Error<PERMISSION_DENIED, false>(msg);
default:
return Status::Error<doris::INTERNAL_ERROR, false>("{}: {}", msg,
std::strerror(posix_errno));
return Status::Error<ErrorCode::INTERNAL_ERROR, false>("{}: {}", msg,
std::strerror(posix_errno));
}
}

Expand All @@ -122,7 +122,7 @@ Status s3fs_error(const Aws::S3::S3Error& err, std::string_view msg) {
err.GetExceptionName(), err.GetMessage(),
err.GetErrorType());
default:
return Status::Error<doris::INTERNAL_ERROR, false>(
return Status::Error<ErrorCode::INTERNAL_ERROR, false>(
"{}: {} {} code={} type={}", msg, err.GetExceptionName(), err.GetMessage(),
err.GetResponseCode(), err.GetErrorType());
}
Expand Down
11 changes: 5 additions & 6 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,8 @@ S3FileWriter::~S3FileWriter() {
// if we don't abort multi part upload, the uploaded part in object
// store will not automatically reclaim itself, it would cost more money
static_cast<void>(_abort());
_bytes_written = 0;
}
s3_bytes_written_total << _bytes_written;
s3_bytes_written_total << _bytes_appended;
s3_file_being_written << -1;
}

Expand Down Expand Up @@ -207,8 +206,10 @@ Status S3FileWriter::close() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
} else if (_create_empty_file) {
// if there is no pending buffer, we need to create an empty file
}

if (_bytes_appended == 0 && _create_empty_file) {
// No data written, but need to create an empty file
auto builder = FileBufferBuilder();
builder.set_type(BufferType::UPLOAD)
.set_upload_callback([this](UploadFileBuffer& buf) { _put_object(buf); })
Expand Down Expand Up @@ -390,7 +391,6 @@ void S3FileWriter::_upload_one_part(int64_t part_num, UploadFileBuffer& buf) {

std::unique_lock<std::mutex> lck {_completed_lock};
_completed_parts.emplace_back(std::move(completed_part));
_bytes_written += buf.get_size();
}

Status S3FileWriter::_complete() {
Expand Down Expand Up @@ -501,7 +501,6 @@ void S3FileWriter::_put_object(UploadFileBuffer& buf) {
buf.set_status(_st);
return;
}
_bytes_written += buf.get_size();
s3_file_created_total << 1;
}

Expand Down
1 change: 0 additions & 1 deletion be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ class S3FileWriter final : public FileWriter {

std::atomic_bool _failed = false;
Status _st;
size_t _bytes_written = 0;

std::shared_ptr<FileBuffer> _pending_buf;
int64_t _expiration_time;
Expand Down

0 comments on commit 8fdd809

Please sign in to comment.