From 1c66a07dab08f9dc61311521202b7475b059b987 Mon Sep 17 00:00:00 2001 From: Bingran Hu Date: Thu, 21 Nov 2024 02:39:18 -0500 Subject: [PATCH] Adress review comments --- .../clp/streaming_compression/Compressor.hpp | 15 ++---- .../passthrough/Compressor.cpp | 3 +- .../passthrough/Compressor.hpp | 7 ++- .../streaming_compression/zstd/Compressor.cpp | 35 ++++++++---- .../streaming_compression/zstd/Compressor.hpp | 10 ++-- .../core/tests/test-StreamingCompression.cpp | 53 ++++++++++--------- 6 files changed, 64 insertions(+), 59 deletions(-) diff --git a/components/core/src/clp/streaming_compression/Compressor.hpp b/components/core/src/clp/streaming_compression/Compressor.hpp index 602a16afc..4963fbc13 100644 --- a/components/core/src/clp/streaming_compression/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/Compressor.hpp @@ -36,10 +36,11 @@ class Compressor : public WriterInterface { // Destructor virtual ~Compressor() = default; - // Explicitly disable copy constructor/assignment and enable the move version + // Delete copy constructor and assignment operator Compressor(Compressor const&) = delete; auto operator=(Compressor const&) -> Compressor& = delete; + // Default move constructor and assignment operator Compressor(Compressor&&) noexcept = default; auto operator=(Compressor&&) noexcept -> Compressor& = default; @@ -64,7 +65,7 @@ class Compressor : public WriterInterface { // Methods /** - * Closes the compression stream + * Closes the compressor */ virtual auto close() -> void = 0; @@ -72,15 +73,7 @@ class Compressor : public WriterInterface { * Initializes the compression stream * @param file_writer */ - virtual auto open(FileWriter& file_writer) -> void { open(file_writer, 0); } - - /** - * Initializes the compression stream with the given compression level - * @param file_writer - * @param compression_level - */ - virtual auto open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void - = 0; + virtual auto open(FileWriter& file_writer) -> void = 0; private: // Variables diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index 5f1914a3b..d4ab89dbe 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -42,8 +42,7 @@ auto Compressor::close() -> void { m_compressed_stream_file_writer = nullptr; } -auto Compressor::open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) - -> void { +auto Compressor::open(FileWriter& file_writer) -> void { m_compressed_stream_file_writer = &file_writer; } } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index 8674e8937..a94086196 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -34,10 +34,11 @@ class Compressor : public ::clp::streaming_compression::Compressor { // Destructor ~Compressor() override = default; - // Explicitly disable copy constructor/assignment and enable the move version + // Delete copy constructor and assignment operator Compressor(Compressor const&) = delete; auto operator=(Compressor const&) -> Compressor& = delete; + // Default move constructor and assignment operator Compressor(Compressor&&) noexcept = default; auto operator=(Compressor&&) noexcept -> Compressor& = default; @@ -71,10 +72,8 @@ class Compressor : public ::clp::streaming_compression::Compressor { /** * Initializes the compression stream * @param file_writer - * @param compression_level */ - auto - open(FileWriter& file_writer, [[maybe_unused]] int const compression_level) -> void override; + auto open(FileWriter& file_writer) -> void override; private: // Variables diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index 723244b3a..f842ea278 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -12,6 +12,26 @@ #include "../Compressor.hpp" #include "../Constants.hpp" +namespace { +/** + * Checks if a value returned by ZStd function indicates an error code. + * + * For most ZStd functions that return `size_t` results, instead of returning a union type that can + * either be a valid result or an error code, an unanimous `size_t` type is returned. + * Usually, if the return value exceeds the maximum possible value of valid results, it is treated + * as an error code. However, the exact behavior is function-dependent, so ZStd provides: + * 1. A value checking function `ZSTD_isError` + * 2. A size_t <-> error_code_enum mapping function `ZSTD_getErrorCode`. + * See also: https://facebook.github.io/zstd/zstd_manual.html + * + * @param result A `size_t` type result returned by ZStd functions + * @return Whether the result is an error code and indicates an error has occurred + */ +auto is_error(size_t result) -> bool { + return 0 != ZSTD_isError(result) && ZSTD_error_no_error != ZSTD_getErrorCode(result); +} +} // namespace + namespace clp::streaming_compression::zstd { Compressor::Compressor() : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, @@ -26,7 +46,7 @@ Compressor::~Compressor() { ZSTD_freeCStream(m_compression_stream); } -auto Compressor::open(FileWriter& file_writer, int const compression_level) -> void { +auto Compressor::open(FileWriter& file_writer, int compression_level) -> void { if (nullptr != m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); } @@ -39,7 +59,7 @@ auto Compressor::open(FileWriter& file_writer, int const compression_level) -> v // Setup compression stream auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; - if (zstd_is_error(init_result)) { + if (is_error(init_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_initCStream() error: {}", ZSTD_getErrorName(init_result) @@ -82,7 +102,7 @@ auto Compressor::write(char const* data, size_t data_length) -> void { &m_compressed_stream_block, &uncompressed_stream_block )}; - if (zstd_is_error(compress_result)) { + if (is_error(compress_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream() error: {}", ZSTD_getErrorName(compress_result) @@ -110,7 +130,7 @@ auto Compressor::flush() -> void { m_compressed_stream_block.pos = 0; auto const end_stream_result{ZSTD_endStream(m_compression_stream, &m_compressed_stream_block)}; - if (zstd_is_error(end_stream_result)) { + if (is_error(end_stream_result)) { // Note: Output buffer is large enough that it is guaranteed to have enough room to be // able to flush the entire buffer, so this can only be an error SPDLOG_ERROR( @@ -144,7 +164,7 @@ auto Compressor::flush_without_ending_frame() -> void { while (true) { m_compressed_stream_block.pos = 0; auto const flush_result{ZSTD_flushStream(m_compression_stream, &m_compressed_stream_block)}; - if (zstd_is_error(flush_result)) { + if (is_error(flush_result)) { SPDLOG_ERROR( "streaming_compression::zstd::Compressor: ZSTD_compressStream2() error: {}", ZSTD_getErrorName(flush_result) @@ -162,9 +182,4 @@ auto Compressor::flush_without_ending_frame() -> void { } } } - -auto Compressor::zstd_is_error(size_t size_t_function_result) -> bool { - return ZSTD_isError(size_t_function_result) != 0 - && ZSTD_error_no_error != ZSTD_getErrorCode(size_t_function_result); -} } // namespace clp::streaming_compression::zstd diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index 0341c8bbf..32c0f2338 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -35,10 +35,11 @@ class Compressor : public ::clp::streaming_compression::Compressor { // Destructor ~Compressor() override; - // Explicitly disable copy constructor/assignment and enable the move version + // Delete copy constructor and assignment operator Compressor(Compressor const&) = delete; auto operator=(Compressor const&) -> Compressor& = delete; + // Default move constructor and assignment operator Compressor(Compressor&&) noexcept = default; auto operator=(Compressor&&) noexcept -> Compressor& = default; @@ -82,7 +83,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { * @param file_writer * @param compression_level */ - auto open(FileWriter& file_writer, int const compression_level) -> void override; + auto open(FileWriter& file_writer, int compression_level) -> void; /** * Flushes the stream without ending the current frame @@ -101,11 +102,6 @@ class Compressor : public ::clp::streaming_compression::Compressor { std::vector m_compressed_stream_block_buffer; size_t m_uncompressed_stream_pos{0}; - - /** - * Tells if a `size_t` ZStd function result is an error code and is not `ZSTD_error_no_error` - */ - [[nodiscard]] static auto zstd_is_error(size_t size_t_function_result) -> bool; }; } // namespace clp::streaming_compression::zstd diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index 63d0fb4d8..cbcb12364 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -23,28 +23,26 @@ using clp::FileWriter; using clp::streaming_compression::Compressor; using clp::streaming_compression::Decompressor; -namespace { -constexpr size_t cUncompressedDataSize{128L * 1024 * 1024}; // 128MB -constexpr auto cCompressionChunkSizes = std::to_array( - {cUncompressedDataSize / 100, - cUncompressedDataSize / 50, - cUncompressedDataSize / 25, - cUncompressedDataSize / 10, - cUncompressedDataSize / 5, - cUncompressedDataSize / 2, - cUncompressedDataSize} -); -constexpr size_t cUncompressedDataPatternPeriod = 26; // lower-case alphabet -} // namespace - TEST_CASE("StreamingCompression", "[StreamingCompression]") { + constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB + constexpr auto cCompressionChunkSizes = std::to_array( + {cBufferSize / 100, + cBufferSize / 50, + cBufferSize / 25, + cBufferSize / 10, + cBufferSize / 5, + cBufferSize / 2, + cBufferSize} + ); + constexpr size_t cAlphabetLength = 26; + std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; std::vector compression_chunk_sizes{ cCompressionChunkSizes.begin(), cCompressionChunkSizes.end() }; - std::unique_ptr compressor{}; - std::unique_ptr decompressor{}; + std::unique_ptr compressor; + std::unique_ptr decompressor; SECTION("Initiate zstd single phase compression") { compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize()); @@ -59,13 +57,13 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { // Initialize buffers std::vector uncompressed_buffer{}; - uncompressed_buffer.resize(cUncompressedDataSize); - for (size_t i{0}; i < cUncompressedDataSize; ++i) { - uncompressed_buffer.at(i) = ((char)('a' + (i % cUncompressedDataPatternPeriod))); + uncompressed_buffer.resize(cBufferSize); + for (size_t i{0}; i < cBufferSize; ++i) { + uncompressed_buffer.at(i) = static_cast(('a' + (i % cAlphabetLength))); } std::vector decompressed_buffer{}; - decompressed_buffer.resize(cUncompressedDataSize); + decompressed_buffer.resize(cBufferSize); // Compress FileWriter file_writer; @@ -82,19 +80,24 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { auto const compressed_file_view{memory_mapped_compressed_file.get_view()}; decompressor->open(compressed_file_view.data(), compressed_file_view.size()); - size_t uncompressed_bytes{0}; + size_t num_uncompressed_bytes{0}; for (auto const chunk_size : compression_chunk_sizes) { - memset(decompressed_buffer.data(), 0, cUncompressedDataSize); + // Clear the buffer to ensure that we are not comparing values from a previous test + std::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0); REQUIRE( (ErrorCode_Success == decompressor->get_decompressed_stream_region( - uncompressed_bytes, + num_uncompressed_bytes, decompressed_buffer.data(), chunk_size )) ); - REQUIRE((memcmp(uncompressed_buffer.data(), decompressed_buffer.data(), chunk_size) == 0)); - uncompressed_bytes += chunk_size; + REQUIRE(std::equal( + uncompressed_buffer.begin(), + uncompressed_buffer.begin() + chunk_size, + decompressed_buffer.begin() + )); + num_uncompressed_bytes += chunk_size; } // Cleanup