diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp index d4ab89dbe..57bf36ebb 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.cpp @@ -4,8 +4,14 @@ #include "../../ErrorCode.hpp" #include "../../TraceableException.hpp" +#include "../Compressor.hpp" +#include "../Constants.hpp" namespace clp::streaming_compression::passthrough { +Compressor::Compressor() + : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, + m_compressed_stream_file_writer{nullptr} {} + auto Compressor::write(char const* data, size_t const data_length) -> void { if (nullptr == m_compressed_stream_file_writer) { throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); diff --git a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp index a94086196..9ec291c19 100644 --- a/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/passthrough/Compressor.hpp @@ -7,7 +7,6 @@ #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" #include "../Compressor.hpp" -#include "../Constants.hpp" namespace clp::streaming_compression::passthrough { /** @@ -29,7 +28,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { }; // Constructors - Compressor() : ::clp::streaming_compression::Compressor{CompressorType::Passthrough} {} + Compressor(); // Destructor ~Compressor() override = default; @@ -77,7 +76,7 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer{nullptr}; + FileWriter* m_compressed_stream_file_writer; }; } // namespace clp::streaming_compression::passthrough diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp index f842ea278..cbfabbe89 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.cpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.cpp @@ -1,11 +1,11 @@ #include "Compressor.hpp" #include -#include #include #include +#include "../../Array.hpp" #include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" @@ -35,7 +35,17 @@ auto is_error(size_t result) -> bool { namespace clp::streaming_compression::zstd { Compressor::Compressor() : ::clp::streaming_compression::Compressor{CompressorType::ZSTD}, - m_compression_stream{ZSTD_createCStream()} { + m_compressed_stream_file_writer{nullptr}, + m_compression_stream{ZSTD_createCStream()}, + m_compression_stream_contains_data{false}, + m_compressed_stream_block_size{ZSTD_CStreamOutSize()}, + m_compressed_stream_block_buffer{Array{m_compressed_stream_block_size}}, + m_compressed_stream_block{ + .dst = m_compressed_stream_block_buffer.data(), + .size = m_compressed_stream_block_size, + .pos = 0 + }, + m_uncompressed_stream_pos{0} { if (nullptr == m_compression_stream) { SPDLOG_ERROR("streaming_compression::zstd::Compressor: ZSTD_createCStream() error"); throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); @@ -51,12 +61,6 @@ auto Compressor::open(FileWriter& file_writer, int compression_level) -> void { throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); } - // Setup compressed stream parameters - auto const compressed_stream_block_size{ZSTD_CStreamOutSize()}; - m_compressed_stream_block_buffer.resize(compressed_stream_block_size); - m_compressed_stream_block.dst = m_compressed_stream_block_buffer.data(); - m_compressed_stream_block.size = compressed_stream_block_size; - // Setup compression stream auto const init_result{ZSTD_initCStream(m_compression_stream, compression_level)}; if (is_error(init_result)) { diff --git a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp index 32c0f2338..0908b4fcc 100644 --- a/components/core/src/clp/streaming_compression/zstd/Compressor.hpp +++ b/components/core/src/clp/streaming_compression/zstd/Compressor.hpp @@ -2,10 +2,10 @@ #define CLP_STREAMING_COMPRESSION_ZSTD_COMPRESSOR_HPP #include -#include #include +#include "../../Array.hpp" #include "../../ErrorCode.hpp" #include "../../FileWriter.hpp" #include "../../TraceableException.hpp" @@ -92,16 +92,17 @@ class Compressor : public ::clp::streaming_compression::Compressor { private: // Variables - FileWriter* m_compressed_stream_file_writer{nullptr}; + FileWriter* m_compressed_stream_file_writer; // Compressed stream variables ZSTD_CStream* m_compression_stream; - bool m_compression_stream_contains_data{false}; + bool m_compression_stream_contains_data; - ZSTD_outBuffer m_compressed_stream_block{}; - std::vector m_compressed_stream_block_buffer; + size_t m_compressed_stream_block_size; + Array m_compressed_stream_block_buffer; + ZSTD_outBuffer m_compressed_stream_block; - size_t m_uncompressed_stream_pos{0}; + size_t m_uncompressed_stream_pos; }; } // namespace clp::streaming_compression::zstd diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index cbcb12364..f3083a1cc 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -1,13 +1,14 @@ +#include #include #include #include #include -#include #include #include #include +#include "../src/clp/Array.hpp" #include "../src/clp/ErrorCode.hpp" #include "../src/clp/FileWriter.hpp" #include "../src/clp/ReadOnlyMemoryMappedFile.hpp" @@ -18,12 +19,14 @@ #include "../src/clp/streaming_compression/zstd/Compressor.hpp" #include "../src/clp/streaming_compression/zstd/Decompressor.hpp" +using clp::Array; using clp::ErrorCode_Success; using clp::FileWriter; using clp::streaming_compression::Compressor; using clp::streaming_compression::Decompressor; TEST_CASE("StreamingCompression", "[StreamingCompression]") { + // Initialize constants constexpr size_t cBufferSize{128L * 1024 * 1024}; // 128MB constexpr auto cCompressionChunkSizes = std::to_array( {cBufferSize / 100, @@ -35,41 +38,35 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { cBufferSize} ); constexpr size_t cAlphabetLength = 26; - std::string const compressed_file_path{"test_streaming_compressed_file.bin"}; - std::vector compression_chunk_sizes{ - cCompressionChunkSizes.begin(), - cCompressionChunkSizes.end() - }; + + // Initialize compression devices std::unique_ptr compressor; std::unique_ptr decompressor; - SECTION("Initiate zstd single phase compression") { - compression_chunk_sizes.insert(compression_chunk_sizes.begin(), ZSTD_CStreamInSize()); + SECTION("ZStd single phase compression") { compressor = std::make_unique(); decompressor = std::make_unique(); } - SECTION("Initiate passthrough compression") { + SECTION("Passthrough compression") { compressor = std::make_unique(); decompressor = std::make_unique(); } // Initialize buffers - std::vector uncompressed_buffer{}; - uncompressed_buffer.resize(cBufferSize); + Array uncompressed_buffer{cBufferSize}; for (size_t i{0}; i < cBufferSize; ++i) { uncompressed_buffer.at(i) = static_cast(('a' + (i % cAlphabetLength))); } - std::vector decompressed_buffer{}; - decompressed_buffer.resize(cBufferSize); + Array decompressed_buffer{cBufferSize}; // Compress FileWriter file_writer; file_writer.open(compressed_file_path, FileWriter::OpenMode::CREATE_FOR_WRITING); compressor->open(file_writer); - for (auto const chunk_size : compression_chunk_sizes) { + for (auto const chunk_size : cCompressionChunkSizes) { compressor->write(uncompressed_buffer.data(), chunk_size); } compressor->close(); @@ -81,9 +78,9 @@ TEST_CASE("StreamingCompression", "[StreamingCompression]") { decompressor->open(compressed_file_view.data(), compressed_file_view.size()); size_t num_uncompressed_bytes{0}; - for (auto const chunk_size : compression_chunk_sizes) { + for (auto const chunk_size : cCompressionChunkSizes) { // Clear the buffer to ensure that we are not comparing values from a previous test - std::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0); + std::ranges::fill(decompressed_buffer.begin(), decompressed_buffer.end(), 0); REQUIRE( (ErrorCode_Success == decompressor->get_decompressed_stream_region(