Skip to content

Commit

Permalink
Support streaming for lz4 recompress
Browse files Browse the repository at this point in the history
This reduces peak memory usage of update_engine from
~700MB to ~400MB. As we no longer need to cache the entire patched
blocks in memory, data are written to disk as they come.

Test: th
Change-Id: I7b353dbaee4ee63e984ec2014476e3d27387e0fc
  • Loading branch information
zhangxp1998 committed Feb 25, 2022
1 parent 760c334 commit 636ba2f
Show file tree
Hide file tree
Showing 6 changed files with 155 additions and 99 deletions.
8 changes: 4 additions & 4 deletions common/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1062,16 +1062,16 @@ string GetExclusionName(const string& str_to_convert) {
return base::NumberToString(base::StringPieceHash()(str_to_convert));
}

static bool ParseTimestamp(const std::string& str, int64_t* out) {
if (!base::StringToInt64(str, out)) {
static bool ParseTimestamp(std::string_view str, int64_t* out) {
if (!base::StringToInt64(base::StringPiece(str.data(), str.size()), out)) {
LOG(WARNING) << "Invalid timestamp: " << str;
return false;
}
return true;
}

ErrorCode IsTimestampNewer(const std::string& old_version,
const std::string& new_version) {
ErrorCode IsTimestampNewer(const std::string_view old_version,
const std::string_view new_version) {
if (old_version.empty() || new_version.empty()) {
LOG(WARNING)
<< "One of old/new timestamp is empty, permit update anyway. Old: "
Expand Down
4 changes: 2 additions & 2 deletions common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,8 @@ std::string GetExclusionName(const std::string& str_to_convert);
// integer.
// Return kPayloadTimestampError if both are integers but |new_version| <
// |old_version|.
ErrorCode IsTimestampNewer(const std::string& old_version,
const std::string& new_version);
ErrorCode IsTimestampNewer(const std::string_view old_version,
const std::string_view new_version);

std::unique_ptr<android::base::MappedFile> GetReadonlyZeroBlock(size_t size);

Expand Down
218 changes: 131 additions & 87 deletions lz4diff/lz4patch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

#include "android-base/strings.h"
#include "lz4diff/lz4diff.h"
#include "lz4diff/lz4diff.pb.h"
#include "lz4diff_compress.h"
#include "lz4diff_format.h"
#include "puffin/puffpatch.h"
Expand Down Expand Up @@ -168,45 +169,6 @@ bool bspatch(std::string_view input_data,
return err == 0;
}

bool ApplyPostfixPatch(
std::string_view recompressed_blob,
const google::protobuf::RepeatedPtrField<CompressedBlockInfo>&
dst_block_info,
Blob* output) {
// Output size should be always identical to size of recompressed_blob
output->clear();
output->reserve(recompressed_blob.size());
size_t offset = 0;
for (const auto& block_info : dst_block_info) {
auto block =
recompressed_blob.substr(offset, block_info.compressed_length());
if (!block_info.sha256_hash().empty()) {
Blob actual_hash;
CHECK(HashCalculator::RawHashOfBytes(
block.data(), block.size(), &actual_hash));
if (ToStringView(actual_hash) != block_info.sha256_hash()) {
LOG(ERROR) << "Block " << block_info
<< " is corrupted. This usually means the patch generator "
"used a different version of LZ4, or an incompatible LZ4 "
"patch generator was used, or LZ4 produces different "
"output on different platforms. Expected hash: "
<< HexEncode(block_info.sha256_hash())
<< ", actual hash: " << HexEncode(actual_hash);
}
}
if (!block_info.postfix_bspatch().empty()) {
Blob fixed_block;
TEST_AND_RETURN_FALSE(
bspatch(block, block_info.postfix_bspatch(), &fixed_block));
output->insert(output->end(), fixed_block.begin(), fixed_block.end());
} else {
output->insert(output->end(), block.begin(), block.end());
}
offset += block_info.compressed_length();
}
return true;
}

bool puffpatch(std::string_view input_data,
std::string_view patch_data,
Blob* output) {
Expand All @@ -219,6 +181,7 @@ bool puffpatch(std::string_view input_data,
std::vector<CompressedBlock> ToCompressedBlockVec(
const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& rpf) {
std::vector<CompressedBlock> ret;
ret.reserve(rpf.size());
for (const auto& block : rpf) {
auto& info = ret.emplace_back();
info.compressed_length = block.compressed_length();
Expand All @@ -237,66 +200,147 @@ bool HasPosfixPatches(const Lz4diffPatch& patch) {
return false;
}

} // namespace
size_t GetCompressedSize(
const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
size_t compressed_size = 0;
for (const auto& block : info) {
compressed_size += block.compressed_length();
}
return compressed_size;
}

bool Lz4Patch(std::string_view src_data,
std::string_view patch_data,
Blob* output) {
Lz4diffPatch patch;
TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
size_t GetDecompressedSize(
const google::protobuf::RepeatedPtrField<CompressedBlockInfo>& info) {
size_t decompressed_size = 0;
for (const auto& block : info) {
decompressed_size += block.uncompressed_length();
}
return decompressed_size;
}

bool ApplyInnerPatch(Blob decompressed_src,
const Lz4diffPatch& patch,
Blob* decompressed_dst) {
switch (patch.pb_header.inner_type()) {
case InnerPatchType::BSDIFF:
TEST_AND_RETURN_FALSE(bspatch(
ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
break;
case InnerPatchType::PUFFDIFF:
TEST_AND_RETURN_FALSE(puffpatch(
ToStringView(decompressed_src), patch.inner_patch, decompressed_dst));
break;
default:
LOG(ERROR) << "Unsupported patch type: " << patch.pb_header.inner_type();
return false;
}
return true;
}

// TODO(zhangkelvin) Rewrite this in C++ 20 coroutine once that's available.
// Hand coding CPS is not fun.
bool Lz4Patch(std::string_view src_data,
const Lz4diffPatch& patch,
const SinkFunc& sink) {
auto decompressed_src = TryDecompressBlob(
src_data,
ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
patch.pb_header.src_info().zero_padding_enabled());
TEST_AND_RETURN_FALSE(!decompressed_src.empty());
Blob decompressed_dst;
// This scope is here just so that |decompressed_src| can be freed earlier
// than function scope.
// This whole patching algorithm has non-trivial memory usage, as it needs to
// load source data in to memory and decompress that. Now both src and
// decompressed src data are in memory.
// TODO(b/206729162) Make lz4diff more memory efficient and more streaming
// friendly.
{
const auto decompressed_src = TryDecompressBlob(
src_data,
ToCompressedBlockVec(patch.pb_header.src_info().block_info()),
patch.pb_header.src_info().zero_padding_enabled());
switch (patch.pb_header.inner_type()) {
case InnerPatchType::BSDIFF:
TEST_AND_RETURN_FALSE(bspatch(ToStringView(decompressed_src),
patch.inner_patch,
&decompressed_dst));
break;
case InnerPatchType::PUFFDIFF:
TEST_AND_RETURN_FALSE(puffpatch(ToStringView(decompressed_src),
patch.inner_patch,
&decompressed_dst));
break;
default:
LOG(ERROR) << "Unsupported patch type: "
<< patch.pb_header.inner_type();
return false;
}
const auto decompressed_dst_size =
GetDecompressedSize(patch.pb_header.dst_info().block_info());
decompressed_dst.reserve(decompressed_dst_size);

ApplyInnerPatch(std::move(decompressed_src), patch, &decompressed_dst);

if (!HasPosfixPatches(patch)) {
return TryCompressBlob(
ToStringView(decompressed_dst),
ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
patch.pb_header.dst_info().zero_padding_enabled(),
patch.pb_header.dst_info().algo(),
sink);
}
auto postfix_patcher =
[&sink,
block_idx = 0,
&dst_block_info = patch.pb_header.dst_info().block_info()](
const uint8_t* data, size_t size) mutable -> size_t {
if (block_idx >= dst_block_info.size()) {
return sink(data, size);
}
const auto& block_info = dst_block_info[block_idx];
TEST_EQ(size, block_info.compressed_length());
DEFER { block_idx++; };
if (block_info.postfix_bspatch().empty()) {
return sink(data, size);
}
if (!block_info.sha256_hash().empty()) {
Blob actual_hash;
TEST_AND_RETURN_FALSE(
HashCalculator::RawHashOfBytes(data, size, &actual_hash));
if (ToStringView(actual_hash) != block_info.sha256_hash()) {
LOG(ERROR) << "Block " << block_info
<< " is corrupted. This usually means the patch generator "
"used a different version of LZ4, or an incompatible LZ4 "
"patch generator was used, or LZ4 produces different "
"output on different platforms. Expected hash: "
<< HexEncode(block_info.sha256_hash())
<< ", actual hash: " << HexEncode(actual_hash);
return 0;
}
}
Blob fixed_block;
TEST_AND_RETURN_FALSE(
bspatch(std::string_view(reinterpret_cast<const char*>(data), size),
block_info.postfix_bspatch(),
&fixed_block));
return sink(fixed_block.data(), fixed_block.size());
};

auto recompressed_dst = TryCompressBlob(
return TryCompressBlob(
ToStringView(decompressed_dst),
ToCompressedBlockVec(patch.pb_header.dst_info().block_info()),
patch.pb_header.dst_info().zero_padding_enabled(),
patch.pb_header.dst_info().algo());
TEST_AND_RETURN_FALSE(recompressed_dst.size() > 0);
// free memory used by |decompressed_dst|.
decompressed_dst = {};

if (HasPosfixPatches(patch)) {
TEST_AND_RETURN_FALSE(
ApplyPostfixPatch(ToStringView(recompressed_dst),
patch.pb_header.dst_info().block_info(),
output));
} else {
*output = std::move(recompressed_dst);
}
patch.pb_header.dst_info().algo(),
postfix_patcher);
}

bool Lz4Patch(std::string_view src_data,
const Lz4diffPatch& patch,
Blob* output) {
Blob blob;
const auto output_size =
GetCompressedSize(patch.pb_header.dst_info().block_info());
blob.reserve(output_size);
TEST_AND_RETURN_FALSE(Lz4Patch(
src_data, patch, [&blob](const uint8_t* data, size_t size) -> size_t {
blob.insert(blob.end(), data, data + size);
return size;
}));
*output = std::move(blob);
return true;
}

} // namespace

bool Lz4Patch(std::string_view src_data,
std::string_view patch_data,
Blob* output) {
Lz4diffPatch patch;
TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
return Lz4Patch(src_data, patch, output);
}

bool Lz4Patch(std::string_view src_data,
std::string_view patch_data,
const SinkFunc& sink) {
Lz4diffPatch patch;
TEST_AND_RETURN_FALSE(ParseLz4DifffPatch(patch_data, &patch));
return Lz4Patch(src_data, patch, sink);
}

bool Lz4Patch(const Blob& src_data, const Blob& patch_data, Blob* output) {
return Lz4Patch(ToStringView(src_data), ToStringView(patch_data), output);
}
Expand Down
5 changes: 5 additions & 0 deletions lz4diff/lz4patch.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,11 @@
#include "lz4diff_format.h"

namespace chromeos_update_engine {

bool Lz4Patch(std::string_view src_data,
std::string_view patch_data,
const SinkFunc& sink);

bool Lz4Patch(std::string_view src_data,
std::string_view patch_data,
Blob* output);
Expand Down
5 changes: 3 additions & 2 deletions payload_consumer/block_extent_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ size_t BlockExtentWriter::ConsumeWithBuffer(const uint8_t* data, size_t count) {

if (buffer_.empty() && count >= cur_extent_size) {
if (!WriteExtent(data, cur_extent, block_size_)) {
LOG(ERROR) << "WriteExtent(" << cur_extent.start_block() << ", " << data
<< ", " << cur_extent_size << ") failed.";
LOG(ERROR) << "WriteExtent(" << cur_extent.start_block() << ", "
<< static_cast<const void*>(data) << ", " << cur_extent_size
<< ") failed.";
// return value is expected to be greater than 0. Return 0 to signal error
// condition
return 0;
Expand Down
14 changes: 10 additions & 4 deletions payload_consumer/install_operation_executor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -268,12 +268,18 @@ bool InstallOperationExecutor::ExecuteLz4diffOperation(
size_t count) {
brillo::Blob src_data;

brillo::Blob dst_data;
TEST_AND_RETURN_FALSE(utils::ReadExtents(
source_fd, operation.src_extents(), &src_data, block_size_));
TEST_AND_RETURN_FALSE(
Lz4Patch(ToStringView(src_data), ToStringView(data, count), &dst_data));
return writer->Write(dst_data.data(), dst_data.size());
TEST_AND_RETURN_FALSE(Lz4Patch(
ToStringView(src_data),
ToStringView(data, count),
[writer(writer.get())](const uint8_t* data, size_t size) -> size_t {
if (!writer->Write(data, size)) {
return 0;
}
return size;
}));
return true;
}

bool InstallOperationExecutor::ExecuteSourceBsdiffOperation(
Expand Down

0 comments on commit 636ba2f

Please sign in to comment.