Skip to content

Commit

Permalink
Add streaming support to LZ4 compress routine
Browse files Browse the repository at this point in the history
In preparation to enable lz4diff streaming mode.
Instead of append data to std::vector, lz4diff APIs
now accept a sink function which we send computed data
to. This way, caller can choose to cache data in memory,
or stream writes to disk as data comes.

Bug: 206729162
Test: th
Change-Id: Ib1aea5c1b730d30a1b4814f8d5dd8ce3a8b43826
  • Loading branch information
zhangxp1998 authored and Treehugger Robot committed Feb 24, 2022
1 parent 1f690a4 commit 760c334
Show file tree
Hide file tree
Showing 6 changed files with 122 additions and 62 deletions.
14 changes: 14 additions & 0 deletions common/hash_calculator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -125,4 +125,18 @@ bool HashCalculator::SetContext(const string& context) {
return true;
}

std::string HashCalculator::SHA256Digest(std::string_view blob) {
std::vector<unsigned char> hash;
HashCalculator::RawHashOfBytes(blob.data(), blob.size(), &hash);
return HexEncode(hash);
}

std::string HashCalculator::SHA256Digest(std::vector<unsigned char> blob) {
return SHA256Digest(ToStringView(blob));
}

std::string HashCalculator::SHA256Digest(std::vector<char> blob) {
return SHA256Digest(ToStringView(blob));
}

} // namespace chromeos_update_engine
4 changes: 4 additions & 0 deletions common/hash_calculator.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@ class HashCalculator {
off_t length,
brillo::Blob* out_hash);
static bool RawHashOfFile(const std::string& name, brillo::Blob* out_hash);
static std::string SHA256Digest(std::string_view blob);

static std::string SHA256Digest(std::vector<unsigned char> blob);
static std::string SHA256Digest(std::vector<char> blob);

private:
// If non-empty, the final raw hash. Will only be set to non-empty when
Expand Down
11 changes: 11 additions & 0 deletions common/utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1117,4 +1117,15 @@ std::string HexEncode(const std::string_view blob) noexcept {
return base::HexEncode(blob.data(), blob.size());
}

[[nodiscard]] std::string_view ToStringView(
const std::vector<unsigned char>& blob) noexcept {
return std::string_view{reinterpret_cast<const char*>(blob.data()),
blob.size()};
}

[[nodiscard]] std::string_view ToStringView(const void* data,
size_t size) noexcept {
return std::string_view(reinterpret_cast<const char*>(data), size);
}

} // namespace chromeos_update_engine
11 changes: 11 additions & 0 deletions common/utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -545,6 +545,17 @@ std::string HexEncode(const std::array<uint8_t, kSize> blob) noexcept {
return base::HexEncode(blob.data(), blob.size());
}

[[nodiscard]] std::string_view ToStringView(
const std::vector<unsigned char>& blob) noexcept;

constexpr std::string_view ToStringView(
const std::vector<char>& blob) noexcept {
return std::string_view{blob.data(), blob.size()};
}

[[nodiscard]] std::string_view ToStringView(const void* data,
size_t size) noexcept;

} // namespace chromeos_update_engine

#define TEST_AND_RETURN_FALSE_ERRNO(_x) \
Expand Down
131 changes: 75 additions & 56 deletions lz4diff/lz4diff_compress.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,17 @@

namespace chromeos_update_engine {

Blob TryCompressBlob(std::string_view blob,
bool TryCompressBlob(std::string_view blob,
const std::vector<CompressedBlock>& block_info,
const bool zero_padding_enabled,
const CompressionAlgorithm compression_algo) {
const CompressionAlgorithm compression_algo,
const SinkFunc& sink) {
size_t uncompressed_size = 0;
size_t compressed_size = 0;
for (const auto& block : block_info) {
CHECK_EQ(uncompressed_size, block.uncompressed_offset)
<< "Compressed block info is expected to be sorted.";
uncompressed_size += block.uncompressed_length;
compressed_size += block.compressed_length;
}
CHECK_EQ(uncompressed_size, blob.size());
Blob output(utils::RoundUp(compressed_size, kBlockSize));
auto hc = LZ4_createStreamHC();
DEFER {
if (hc) {
Expand All @@ -49,69 +46,105 @@ Blob TryCompressBlob(std::string_view blob,
}
};
size_t compressed_offset = 0;
Blob block_buffer;
for (const auto& block : block_info) {
// Execute the increment at end of each loop
DEFER { compressed_offset += block.compressed_length; };
CHECK_LE(compressed_offset + block.compressed_length, output.size());

const auto uncompressed_block =
blob.substr(block.uncompressed_offset, block.uncompressed_length);
if (!block.IsCompressed()) {
std::memcpy(output.data() + compressed_offset,
blob.data() + block.uncompressed_offset,
block.compressed_length);
TEST_EQ(sink(reinterpret_cast<const uint8_t*>(uncompressed_block.data()),
uncompressed_block.size()),
uncompressed_block.size());
continue;
}
block_buffer.resize(block.compressed_length);
// Execute the increment at end of each loop
DEFER {
compressed_offset += block.compressed_length;
block_buffer.clear();
};

int ret = 0;
// LZ4 spec enforces that last op of a compressed block must be an insert op
// of at least 5 bytes. Compressors will try to conform to that requirement
// if the input size is just right. We don't want that. So always give a
// little bit more data.
int src_size = uncompressed_size - block.uncompressed_offset;
uint64_t bytes_written = 0;
switch (compression_algo.type()) {
switch (int src_size = uncompressed_size - block.uncompressed_offset;
compression_algo.type()) {
case CompressionAlgorithm::LZ4HC:
bytes_written = LZ4_compress_HC_destSize(
ret = LZ4_compress_HC_destSize(
hc,
blob.data() + block.uncompressed_offset,
reinterpret_cast<char*>(output.data()) + compressed_offset,
uncompressed_block.data(),
reinterpret_cast<char*>(block_buffer.data()),
&src_size,
block.compressed_length,
compression_algo.level());
break;
case CompressionAlgorithm::LZ4:
bytes_written = LZ4_compress_destSize(
blob.data() + block.uncompressed_offset,
reinterpret_cast<char*>(output.data()) + compressed_offset,
&src_size,
block.compressed_length);
ret =
LZ4_compress_destSize(uncompressed_block.data(),
reinterpret_cast<char*>(block_buffer.data()),
&src_size,
block.compressed_length);
break;
default:
CHECK(false) << "Unrecognized compression algorithm: "
<< compression_algo.type();
break;
LOG(ERROR) << "Unrecognized compression algorithm: "
<< compression_algo.type();
return {};
}
TEST_GT(ret, 0);
const uint64_t bytes_written = ret;
// Last block may have trailing zeros
CHECK_LE(bytes_written, block.compressed_length);
TEST_LE(bytes_written, block.compressed_length);
if (bytes_written < block.compressed_length) {
if (zero_padding_enabled) {
const auto padding = block.compressed_length - bytes_written;
// LOG(INFO) << "Padding: " << padding;
CHECK_LE(compressed_offset + padding + bytes_written, output.size());
std::memmove(output.data() + compressed_offset + padding,
output.data() + compressed_offset,
bytes_written);
CHECK_LE(compressed_offset + padding, output.size());
std::fill(output.data() + compressed_offset,
output.data() + compressed_offset + padding,
0);
std::memmove(
block_buffer.data() + padding, block_buffer.data(), bytes_written);
std::fill(block_buffer.data(), block_buffer.data() + padding, 0);

} else {
std::fill(output.data() + compressed_offset + bytes_written,
output.data() + compressed_offset + block.compressed_length,
std::fill(block_buffer.data() + bytes_written,
block_buffer.data() + block.compressed_length,
0);
}
}
TEST_EQ(sink(block_buffer.data(), block_buffer.size()),
block_buffer.size());
}
// Any trailing data will be copied to the output buffer.
output.insert(output.end(), blob.begin() + uncompressed_size, blob.end());
TEST_EQ(
sink(reinterpret_cast<const uint8_t*>(blob.data()) + uncompressed_size,
blob.size() - uncompressed_size),
blob.size() - uncompressed_size);
return true;
}

Blob TryCompressBlob(std::string_view blob,
const std::vector<CompressedBlock>& block_info,
const bool zero_padding_enabled,
const CompressionAlgorithm compression_algo) {
size_t uncompressed_size = 0;
size_t compressed_size = 0;
for (const auto& block : block_info) {
CHECK_EQ(uncompressed_size, block.uncompressed_offset)
<< "Compressed block info is expected to be sorted.";
uncompressed_size += block.uncompressed_length;
compressed_size += block.compressed_length;
}
TEST_EQ(uncompressed_size, blob.size());
Blob output;
output.reserve(utils::RoundUp(compressed_size, kBlockSize));
if (!TryCompressBlob(blob,
block_info,
zero_padding_enabled,
compression_algo,
[&output](const uint8_t* data, size_t size) {
output.insert(output.end(), data, data + size);
return size;
})) {
return {};
}

return output;
}

Expand Down Expand Up @@ -164,11 +197,6 @@ Blob TryDecompressBlob(std::string_view blob,
block.uncompressed_length,
block.uncompressed_length);
if (bytes_decompressed < 0) {
Blob cluster_hash;
HashCalculator::RawHashOfBytes(
cluster.data(), cluster.size(), &cluster_hash);
Blob blob_hash;
HashCalculator::RawHashOfBytes(blob.data(), blob.size(), &blob_hash);
LOG(FATAL) << "Failed to decompress, " << bytes_decompressed
<< ", output_cursor = "
<< output.size() - block.uncompressed_length
Expand All @@ -177,7 +205,8 @@ Blob TryDecompressBlob(std::string_view blob,
<< ", cluster_size = " << block.compressed_length
<< ", dest capacity = " << block.uncompressed_length
<< ", input margin = " << inputmargin << " "
<< HexEncode(cluster_hash) << " " << HexEncode(blob_hash);
<< HashCalculator::SHA256Digest(cluster) << " "
<< HashCalculator::SHA256Digest(blob);
return {};
}
compressed_offset += block.compressed_length;
Expand All @@ -197,11 +226,6 @@ Blob TryDecompressBlob(std::string_view blob,
return output;
}

[[nodiscard]] std::string_view ToStringView(const Blob& blob) noexcept {
return std::string_view{reinterpret_cast<const char*>(blob.data()),
blob.size()};
}

Blob TryDecompressBlob(const Blob& blob,
const std::vector<CompressedBlock>& block_info,
const bool zero_padding_enabled) {
Expand All @@ -216,11 +240,6 @@ std::ostream& operator<<(std::ostream& out, const CompressedBlock& block) {
return out;
}

[[nodiscard]] std::string_view ToStringView(const void* data,
size_t size) noexcept {
return std::string_view(reinterpret_cast<const char*>(data), size);
}

std::ostream& operator<<(std::ostream& out, const CompressedBlockInfo& info) {
out << "BlockInfo { compressed_length: " << info.compressed_length()
<< ", uncompressed_length: " << info.uncompressed_length()
Expand Down
13 changes: 7 additions & 6 deletions lz4diff/lz4diff_compress.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,12 @@
#define UPDATE_ENGINE_LZ4DIFF_LZ4DIFF_COMPRESS_H_

#include "lz4diff_format.h"

#include <string_view>

namespace chromeos_update_engine {

using SinkFunc = std::function<size_t(const uint8_t*, size_t)>;

// |TryCompressBlob| and |TryDecompressBlob| are inverse function of each other.
// One compresses data into fixed size output chunks, one decompresses fixed
// size blocks.
Expand All @@ -36,6 +37,11 @@ Blob TryCompressBlob(std::string_view blob,
const std::vector<CompressedBlock>& block_info,
const bool zero_padding_enabled,
const CompressionAlgorithm compression_algo);
bool TryCompressBlob(std::string_view blob,
const std::vector<CompressedBlock>& block_info,
const bool zero_padding_enabled,
const CompressionAlgorithm compression_algo,
const SinkFunc& sink);

Blob TryDecompressBlob(std::string_view blob,
const std::vector<CompressedBlock>& block_info,
Expand All @@ -44,11 +50,6 @@ Blob TryDecompressBlob(const Blob& blob,
const std::vector<CompressedBlock>& block_info,
const bool zero_padding_enabled);

[[nodiscard]] std::string_view ToStringView(const Blob& blob) noexcept;

[[nodiscard]] std::string_view ToStringView(const void* data,
size_t size) noexcept;

std::ostream& operator<<(std::ostream& out, const CompressedBlockInfo& info);

std::ostream& operator<<(std::ostream& out, const CompressedBlock& block);
Expand Down

0 comments on commit 760c334

Please sign in to comment.