Skip to content

Commit

Permalink
Adress review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill-hbrhbr committed Nov 21, 2024
1 parent 719f9ee commit 1c66a07
Show file tree
Hide file tree
Showing 6 changed files with 64 additions and 59 deletions.
15 changes: 4 additions & 11 deletions components/core/src/clp/streaming_compression/Compressor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,11 @@ class Compressor : public WriterInterface {
// Destructor
virtual ~Compressor() = default;

// Explicitly disable copy constructor/assignment and enable the move version
// Delete copy constructor and assignment operator
Compressor(Compressor const&) = delete;
auto operator=(Compressor const&) -> Compressor& = delete;

// Default move constructor and assignment operator
Compressor(Compressor&&) noexcept = default;
auto operator=(Compressor&&) noexcept -> Compressor& = default;

Expand All @@ -64,23 +65,15 @@ class Compressor : public WriterInterface {

// Methods
/**
* Closes the compression stream
* Closes the compressor
*/
virtual auto close() -> void = 0;

/**
* Initializes the compression stream
* @param file_writer
*/
virtual auto open(FileWriter& file_writer) -> void { open(file_writer, 0); }

/**
* Initializes the compression stream with the given compression level
* @param file_writer
* @param compression_level
*/
virtual auto open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void
= 0;
virtual auto open(FileWriter& file_writer) -> void = 0;

private:
// Variables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@ auto Compressor::close() -> void {
m_compressed_stream_file_writer = nullptr;
}

auto Compressor::open(FileWriter& file_writer, [[maybe_unused]] int const compression_level)
-> void {
auto Compressor::open(FileWriter& file_writer) -> void {
m_compressed_stream_file_writer = &file_writer;
}
} // namespace clp::streaming_compression::passthrough
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ class Compressor : public ::clp::streaming_compression::Compressor {
// Destructor
~Compressor() override = default;

// Explicitly disable copy constructor/assignment and enable the move version
// Delete copy constructor and assignment operator
Compressor(Compressor const&) = delete;
auto operator=(Compressor const&) -> Compressor& = delete;

// Default move constructor and assignment operator
Compressor(Compressor&&) noexcept = default;
auto operator=(Compressor&&) noexcept -> Compressor& = default;

Expand Down Expand Up @@ -71,10 +72,8 @@ class Compressor : public ::clp::streaming_compression::Compressor {
/**
* Initializes the compression stream
* @param file_writer
* @param compression_level
*/
auto
open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void override;
auto open(FileWriter& file_writer) -> void override;

private:
// Variables
Expand Down
35 changes: 25 additions & 10 deletions components/core/src/clp/streaming_compression/zstd/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,26 @@
#include "../Compressor.hpp"
#include "../Constants.hpp"

namespace {
/**
* Checks if a value returned by ZStd function indicates an error code.
*
* For most ZStd functions that return `size_t` results, instead of returning a union type that can
* either be a valid result or an error code, an unanimous `size_t` type is returned.
* Usually, if the return value exceeds the maximum possible value of valid results, it is treated
* as an error code. However, the exact behavior is function-dependent, so ZStd provides:
* 1. A value checking function `ZSTD_isError`
* 2. A size_t <-> error_code_enum mapping function `ZSTD_getErrorCode`.
* See also: https://facebook.github.io/zstd/zstd_manual.html
*
* @param result A `size_t` type result returned by ZStd functions
* @return Whether the result is an error code and indicates an error has occurred
*/
auto is_error(size_t result) -> bool {
return 0 != ZSTD_isError(result) && ZSTD_error_no_error != ZSTD_getErrorCode(result);
}
} // namespace

namespace clp::streaming_compression::zstd {
Compressor::Compressor()
: ::clp::streaming_compression::Compressor{CompressorType::ZSTD},
Expand All @@ -26,7 +46,7 @@ Compressor::~Compressor() {
ZSTD_freeCStream(m_compression_stream);
}

auto Compressor::open(FileWriter& file_writer, int const compression_level) -> void {
auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
if (nullptr != m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}
Expand All @@ -39,7 +59,7 @@ auto Compressor::open(FileWriter& file_writer, int const compression_level) -> v

// Setup compression stream
auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)};
if (zstd_is_error(init_result)) {
if (is_error(init_result)) {
SPDLOG_ERROR(
"streaming_compression::zstd::Compressor: ZSTD_initCStream() error: {}",
ZSTD_getErrorName(init_result)
Expand Down Expand Up @@ -82,7 +102,7 @@ auto Compressor::write(char const* data, size_t data_length) -> void {
&m_compressed_stream_block,
&uncompressed_stream_block
)};
if (zstd_is_error(compress_result)) {
if (is_error(compress_result)) {
SPDLOG_ERROR(
"streaming_compression::zstd::Compressor: ZSTD_compressStream() error: {}",
ZSTD_getErrorName(compress_result)
Expand Down Expand Up @@ -110,7 +130,7 @@ auto Compressor::flush() -> void {

m_compressed_stream_block.pos = 0;
auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)};
if (zstd_is_error(end_stream_result)) {
if (is_error(end_stream_result)) {
// Note: Output buffer is large enough that it is guaranteed to have enough room to be
// able to flush the entire buffer, so this can only be an error
SPDLOG_ERROR(
Expand Down Expand Up @@ -144,7 +164,7 @@ auto Compressor::flush_without_ending_frame() -> void {
while (true) {
m_compressed_stream_block.pos = 0;
auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)};
if (zstd_is_error(flush_result)) {
if (is_error(flush_result)) {
SPDLOG_ERROR(
"streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}",
ZSTD_getErrorName(flush_result)
Expand All @@ -162,9 +182,4 @@ auto Compressor::flush_without_ending_frame() -> void {
}
}
}

auto Compressor::zstd_is_error(size_t size_t_function_result) -> bool {
return ZSTD_isError(size_t_function_result) != 0
&& ZSTD_error_no_error != ZSTD_getErrorCode(size_t_function_result);
}
} // namespace clp::streaming_compression::zstd
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ class Compressor : public ::clp::streaming_compression::Compressor {
// Destructor
~Compressor() override;

// Explicitly disable copy constructor/assignment and enable the move version
// Delete copy constructor and assignment operator
Compressor(Compressor const&) = delete;
auto operator=(Compressor const&) -> Compressor& = delete;

// Default move constructor and assignment operator
Compressor(Compressor&&) noexcept = default;
auto operator=(Compressor&&) noexcept -> Compressor& = default;

Expand Down Expand Up @@ -82,7 +83,7 @@ class Compressor : public ::clp::streaming_compression::Compressor {
* @param file_writer
* @param compression_level
*/
auto open(FileWriter& file_writer, int const compression_level) -> void override;
auto open(FileWriter& file_writer, int compression_level) -> void;

/**
* Flushes the stream without ending the current frame
Expand All @@ -101,11 +102,6 @@ class Compressor : public ::clp::streaming_compression::Compressor {
std::vector<char> m_compressed_stream_block_buffer;

size_t m_uncompressed_stream_pos{0};

/**
* Tells if a `size_t` ZStd function result is an error code and is not `ZSTD_error_no_error`
*/
[[nodiscard]] static auto zstd_is_error(size_t size_t_function_result) -> bool;
};
} // namespace clp::streaming_compression::zstd

Expand Down
53 changes: 28 additions & 25 deletions components/core/tests/test-StreamingCompression.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,28 +23,26 @@ using clp::FileWriter;
using clp::streaming_compression::Compressor;
using clp::streaming_compression::Decompressor;

namespace {
constexpr size_t cUncompressedDataSize{128L * 1024 * 1024}; // 128MB
constexpr auto cCompressionChunkSizes = std::to_array<size_t>(
{cUncompressedDataSize / 100,
cUncompressedDataSize / 50,
cUncompressedDataSize / 25,
cUncompressedDataSize / 10,
cUncompressedDataSize / 5,
cUncompressedDataSize / 2,
cUncompressedDataSize}
);
constexpr size_t cUncompressedDataPatternPeriod = 26; // lower-case alphabet
} // namespace

TEST_CASE("StreamingCompression", "[StreamingCompression]") {
constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB
constexpr auto cCompressionChunkSizes = std::to_array<size_t>(
{cBufferSize / 100,
cBufferSize / 50,
cBufferSize / 25,
cBufferSize / 10,
cBufferSize / 5,
cBufferSize / 2,
cBufferSize}
);
constexpr size_t cAlphabetLength = 26;

std::string const compressed_file_path{"test_streaming_compressed_file.bin"};
std::vector<size_t> compression_chunk_sizes{
cCompressionChunkSizes.begin(),
cCompressionChunkSizes.end()
};
std::unique_ptr<Compressor> compressor{};
std::unique_ptr<Decompressor> decompressor{};
std::unique_ptr<Compressor> compressor;
std::unique_ptr<Decompressor> decompressor;

SECTION("Initiate zstd single phase compression") {
compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize());
Expand All @@ -59,13 +57,13 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") {

// Initialize buffers
std::vector<char> uncompressed_buffer{};
uncompressed_buffer.resize(cUncompressedDataSize);
for (size_t i{0}; i < cUncompressedDataSize; ++i) {
uncompressed_buffer.at(i) = ((char)('a' + (i % cUncompressedDataPatternPeriod)));
uncompressed_buffer.resize(cBufferSize);
for (size_t i{0}; i < cBufferSize; ++i) {
uncompressed_buffer.at(i) = static_cast<char>(('a' + (i % cAlphabetLength)));
}

std::vector<char> decompressed_buffer{};
decompressed_buffer.resize(cUncompressedDataSize);
decompressed_buffer.resize(cBufferSize);

// Compress
FileWriter file_writer;
Expand All @@ -82,19 +80,24 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") {
auto const compressed_file_view{memory_mapped_compressed_file.get_view()};
decompressor->open(compressed_file_view.data(), compressed_file_view.size());

size_t uncompressed_bytes{0};
size_t num_uncompressed_bytes{0};
for (auto const chunk_size : compression_chunk_sizes) {
memset(decompressed_buffer.data(), 0, cUncompressedDataSize);
// Clear the buffer to ensure that we are not comparing values from a previous test
std::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0);
REQUIRE(
(ErrorCode_Success
== decompressor->get_decompressed_stream_region(
uncompressed_bytes,
num_uncompressed_bytes,
decompressed_buffer.data(),
chunk_size
))
);
REQUIRE((memcmp(uncompressed_buffer.data(), decompressed_buffer.data(), chunk_size) == 0));
uncompressed_bytes += chunk_size;
REQUIRE(std::equal(
uncompressed_buffer.begin(),
uncompressed_buffer.begin() + chunk_size,
decompressed_buffer.begin()
));
num_uncompressed_bytes += chunk_size;
}

// Cleanup
Expand Down

0 comments on commit 1c66a07

Please sign in to comment.