diff --git a/components/core/.clang-format b/components/core/.clang-format index ff65adbae..4d0d3a87c 100644 --- a/components/core/.clang-format +++ b/components/core/.clang-format @@ -4,7 +4,7 @@ IncludeCategories: # NOTE: A header is grouped by first matching regex # Library headers. Update when adding new libraries. # NOTE: clang-format retains leading white-space on a line in violation of the YAML spec. - - Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|mongocxx\ + - Regex: "<(absl|antlr4|archive|boost|bsoncxx|catch2|curl|date|fmt|json|log_surgeon|lzma|mongocxx\ |msgpack|mysql|openssl|outcome|regex_utils|simdjson|spdlog|sqlite3|string_utils|yaml-cpp|zstd)" Priority: 3 # C system headers diff --git a/components/core/CMakeLists.txt b/components/core/CMakeLists.txt index e5c9b06c8..9e9c03464 100644 --- a/components/core/CMakeLists.txt +++ b/components/core/CMakeLists.txt @@ -11,13 +11,16 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # Set general compressor set(GENERAL_COMPRESSOR "zstd" CACHE STRING "The general-purpose compressor used as the 2nd-stage compressor") -set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd) +set_property(CACHE GENERAL_COMPRESSOR PROPERTY STRINGS passthrough zstd lzma) if ("${GENERAL_COMPRESSOR}" STREQUAL "passthrough") add_definitions(-DUSE_PASSTHROUGH_COMPRESSION=1) message(STATUS "Using passthrough compression") elseif ("${GENERAL_COMPRESSOR}" STREQUAL "zstd") add_definitions(-DUSE_ZSTD_COMPRESSION=1) message(STATUS "Using Zstandard compression") +elseif ("${GENERAL_COMPRESSOR}" STREQUAL "lzma") + add_definitions(-DUSE_LZMA_COMPRESSION=1) + message(STATUS "Using Lempel–Ziv–Markov chain Algorithm compression") else() message(SEND_ERROR "GENERAL_COMPRESSOR=${GENERAL_COMPRESSOR} is unimplemented.") endif() @@ -224,6 +227,19 @@ else() message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for ZStd") endif() +# Find and setup LZMA Library +# Notice that we don't have support to switch between static and shared libraries. +# TODO: add a script in ./cmake/Modules to resolve .a vs. .so +find_package(LibLZMA REQUIRED) +if(LIBLZMA_FOUND) + message(STATUS "Found LIBLZMA_FOUND ${LIBLZMA_VERSION_STRING}") + message(STATUS "Lzma library location: ${LIBLZMA_LIBRARIES}") +else() + message(FATAL_ERROR "Could not find ${CLP_LIBS_STRING} libraries for LIBLZMA_FOUND") +endif() +include_directories(${LIBLZMA_INCLUDE_DIRS}) +message("LZMA Include Dir: ${LIBLZMA_INCLUDE_DIRS}") + # sqlite dependencies set(sqlite_DYNAMIC_LIBS "dl;m;pthread") include(cmake/Modules/FindLibraryDependencies.cmake) diff --git a/components/core/src/clp/DictionaryReader.hpp b/components/core/src/clp/DictionaryReader.hpp index 694240ad5..e015c861a 100644 --- a/components/core/src/clp/DictionaryReader.hpp +++ b/components/core/src/clp/DictionaryReader.hpp @@ -10,6 +10,7 @@ #include "dictionary_utils.hpp" #include "DictionaryEntry.hpp" #include "FileReader.hpp" +#include "streaming_compression/lzma/Decompressor.hpp" #include "streaming_compression/passthrough/Decompressor.hpp" #include "streaming_compression/zstd/Decompressor.hpp" #include "Utils.hpp" @@ -115,6 +116,9 @@ class DictionaryReader { #elif USE_ZSTD_COMPRESSION streaming_compression::zstd::Decompressor m_dictionary_decompressor; streaming_compression::zstd::Decompressor m_segment_index_decompressor; +#elif USE_LZMA_COMPRESSION + streaming_compression::lzma::Decompressor m_dictionary_decompressor; + streaming_compression::lzma::Decompressor m_segment_index_decompressor; #else static_assert(false, "Unsupported compression mode."); #endif diff --git a/components/core/src/clp/DictionaryWriter.hpp b/components/core/src/clp/DictionaryWriter.hpp index 7cac9d5aa..a75a29ed1 100644 --- a/components/core/src/clp/DictionaryWriter.hpp +++ b/components/core/src/clp/DictionaryWriter.hpp @@ -8,6 +8,8 @@ #include "Defs.h" #include "FileWriter.hpp" #include "spdlog_with_specializations.hpp" +#include "streaming_compression/lzma/Compressor.hpp" +#include "streaming_compression/lzma/Decompressor.hpp" #include "streaming_compression/passthrough/Compressor.hpp" #include "streaming_compression/passthrough/Decompressor.hpp" #include "streaming_compression/zstd/Compressor.hpp" @@ -97,6 +99,9 @@ class DictionaryWriter { #elif USE_ZSTD_COMPRESSION streaming_compression::zstd::Compressor m_dictionary_compressor; streaming_compression::zstd::Compressor m_segment_index_compressor; +#elif USE_LZMA_COMPRESSION + streaming_compression::lzma::Compressor m_dictionary_compressor; + streaming_compression::lzma::Compressor m_segment_index_compressor; #else static_assert(false, "Unsupported compression mode."); #endif diff --git a/components/core/src/clp/clg/CMakeLists.txt b/components/core/src/clp/clg/CMakeLists.txt index a0ca5e9d0..b316ae4c2 100644 --- a/components/core/src/clp/clg/CMakeLists.txt +++ b/components/core/src/clp/clg/CMakeLists.txt @@ -91,6 +91,11 @@ set( ../streaming_archive/writer/Segment.hpp ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp + ../streaming_compression/lzma/Compressor.cpp + ../streaming_compression/lzma/Compressor.hpp + ../streaming_compression/lzma/Constants.hpp + ../streaming_compression/lzma/Decompressor.cpp + ../streaming_compression/lzma/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp ../streaming_compression/passthrough/Decompressor.cpp diff --git a/components/core/src/clp/clo/CMakeLists.txt b/components/core/src/clp/clo/CMakeLists.txt index 931bffeaf..082eb488c 100644 --- a/components/core/src/clp/clo/CMakeLists.txt +++ b/components/core/src/clp/clo/CMakeLists.txt @@ -91,6 +91,11 @@ set( ../streaming_archive/writer/Segment.hpp ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp + ../streaming_compression/lzma/Compressor.cpp + ../streaming_compression/lzma/Compressor.hpp + ../streaming_compression/lzma/Constants.hpp + ../streaming_compression/lzma/Decompressor.cpp + ../streaming_compression/lzma/Decompressor.hpp ../streaming_compression/passthrough/Compressor.cpp ../streaming_compression/passthrough/Compressor.hpp ../streaming_compression/passthrough/Decompressor.cpp diff --git a/components/core/src/clp/clp/CMakeLists.txt b/components/core/src/clp/clp/CMakeLists.txt index 53342f3a9..becacbc36 100644 --- a/components/core/src/clp/clp/CMakeLists.txt +++ b/components/core/src/clp/clp/CMakeLists.txt @@ -116,6 +116,11 @@ set( ../streaming_archive/writer/Segment.hpp ../streaming_archive/writer/utils.cpp ../streaming_archive/writer/utils.hpp + ../streaming_compression/lzma/Compressor.cpp + ../streaming_compression/lzma/Compressor.hpp + ../streaming_compression/lzma/Constants.hpp + ../streaming_compression/lzma/Decompressor.cpp + ../streaming_compression/lzma/Decompressor.hpp ../streaming_compression/Compressor.hpp ../streaming_compression/Constants.hpp ../streaming_compression/Decompressor.hpp diff --git a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt index fd62a39fb..3bcbdfdc4 100644 --- a/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt +++ b/components/core/src/clp/make_dictionaries_readable/CMakeLists.txt @@ -23,6 +23,8 @@ set( ../ReadOnlyMemoryMappedFile.hpp ../spdlog_with_specializations.hpp ../streaming_compression/Decompressor.hpp + ../streaming_compression/lzma/Decompressor.cpp + ../streaming_compression/lzma/Decompressor.hpp ../streaming_compression/passthrough/Decompressor.cpp ../streaming_compression/passthrough/Decompressor.hpp ../streaming_compression/zstd/Decompressor.cpp diff --git a/components/core/src/clp/streaming_archive/reader/Segment.hpp b/components/core/src/clp/streaming_archive/reader/Segment.hpp index 9ed40ea60..fe53f5a35 100644 --- a/components/core/src/clp/streaming_archive/reader/Segment.hpp +++ b/components/core/src/clp/streaming_archive/reader/Segment.hpp @@ -8,6 +8,7 @@ #include "../../Defs.h" #include "../../ErrorCode.hpp" +#include "../../streaming_compression/lzma/Decompressor.hpp" #include "../../streaming_compression/passthrough/Decompressor.hpp" #include "../../streaming_compression/zstd/Decompressor.hpp" #include "../Constants.hpp" @@ -59,6 +60,8 @@ class Segment { streaming_compression::passthrough::Decompressor m_decompressor; #elif USE_ZSTD_COMPRESSION streaming_compression::zstd::Decompressor m_decompressor; +#elif USE_LZMA_COMPRESSION + streaming_compression::lzma::Decompressor m_decompressor; #else static_assert(false, "Unsupported compression mode."); #endif diff --git a/components/core/src/clp/streaming_archive/writer/Segment.cpp b/components/core/src/clp/streaming_archive/writer/Segment.cpp index 06205481d..3c3de976d 100644 --- a/components/core/src/clp/streaming_archive/writer/Segment.cpp +++ b/components/core/src/clp/streaming_archive/writer/Segment.cpp @@ -45,6 +45,8 @@ void Segment::open(string const& segments_dir_path, segment_id_t id, int compres m_compressor.open(m_file_writer); #elif USE_ZSTD_COMPRESSION m_compressor.open(m_file_writer, compression_level); +#elif USE_LZMA_COMPRESSION + m_compressor.open(m_file_writer, compression_level); #else static_assert(false, "Unsupported compression mode."); #endif diff --git a/components/core/src/clp/streaming_archive/writer/Segment.hpp b/components/core/src/clp/streaming_archive/writer/Segment.hpp index d3dc69617..956303e43 100644 --- a/components/core/src/clp/streaming_archive/writer/Segment.hpp +++ b/components/core/src/clp/streaming_archive/writer/Segment.hpp @@ -6,6 +6,7 @@ #include "../../Defs.h" #include "../../ErrorCode.hpp" +#include "../../streaming_compression/lzma/Compressor.hpp" #include "../../streaming_compression/passthrough/Compressor.hpp" #include "../../streaming_compression/zstd/Compressor.hpp" #include "../../TraceableException.hpp" @@ -90,6 +91,8 @@ class Segment { streaming_compression::passthrough::Compressor m_compressor; #elif USE_ZSTD_COMPRESSION streaming_compression::zstd::Compressor m_compressor; +#elif USE_LZMA_COMPRESSION + streaming_compression::lzma::Compressor m_compressor; #else static_assert(false, "Unsupported compression mode."); #endif diff --git a/components/core/src/clp/streaming_compression/Constants.hpp b/components/core/src/clp/streaming_compression/Constants.hpp index 4649c2e98..080f3a20b 100644 --- a/components/core/src/clp/streaming_compression/Constants.hpp +++ b/components/core/src/clp/streaming_compression/Constants.hpp @@ -7,6 +7,7 @@ namespace clp::streaming_compression { enum class CompressorType : uint8_t { ZSTD = 0x10, + LZMA = 0x20, Passthrough = 0xFF, }; } // namespace clp::streaming_compression diff --git a/components/core/src/clp/streaming_compression/lzma/Compressor.cpp b/components/core/src/clp/streaming_compression/lzma/Compressor.cpp new file mode 100644 index 000000000..c711a6d60 --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Compressor.cpp @@ -0,0 +1,307 @@ +#include "Compressor.hpp" + +// spdlog +#include + +// Compression libraries +#include +#include + +// Project headers +#include "../../Defs.h" + +// File-scope constants +static constexpr size_t cCompressedStreamBlockBufferSize = 4096; // 4KiB + +namespace clp::streaming_compression::lzma { +Compressor::LzmaOption Compressor::m_option; + +Compressor::Compressor() + : ::clp::streaming_compression::Compressor(CompressorType::LZMA), + m_compression_stream_contains_data(false), + m_compressed_stream_file_writer(nullptr), + m_compression_stream(nullptr) { + m_compressed_stream_block_buffer = std::make_unique(cCompressedStreamBlockBufferSize); + m_compression_stream = new lzma_stream; + memset(m_compression_stream, 0, sizeof(lzma_stream)); +} + +Compressor::~Compressor() { + if (nullptr != m_compression_stream) { + delete m_compression_stream; + } +} + +void Compressor::init_lzma_encoder(lzma_stream* strm) { + lzma_options_lzma options; + if (lzma_lzma_preset(&options, m_option.get_compression_level())) { + SPDLOG_ERROR("Failed to initialize LZMA options."); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + options.dict_size = m_option.get_dict_size(); + lzma_filter filters[2]{ + {LZMA_FILTER_LZMA2, &options}, + {LZMA_VLI_UNKNOWN, nullptr}, + }; + + // Initialize the encoder using a preset. Set the integrity to check + // to CRC64, which is the default in the xz command line tool. If + // the .xz file needs to be decompressed with XZ Embedded, use + // LZMA_CHECK_CRC32 instead. + lzma_ret ret = lzma_stream_encoder(strm, filters, LZMA_CHECK_CRC64); + + // Return successfully if the initialization went fine. + if (ret == LZMA_OK) { + return; + } + + // Something went wrong. The possible errors are documented in + // lzma/container.h (src/liblzma/api/lzma/container.h in the source + // package or e.g. /usr/include/lzma/container.h depending on the + // install prefix). + char const* msg; + switch (ret) { + case LZMA_MEM_ERROR: + msg = "Memory allocation failed"; + break; + + case LZMA_OPTIONS_ERROR: + msg = "Specified preset is not supported"; + break; + + case LZMA_UNSUPPORTED_CHECK: + msg = "Specified integrity check is not supported"; + break; + + default: + // This is most likely LZMA_PROG_ERROR indicating a bug in + // this program or in liblzma. It is inconvenient to have a + // separate error message for errors that should be impossible + // to occur, but knowing the error code is important for + // debugging. That's why it is good to print the error code + // at least when there is no good error message to show. + msg = "Unknown error, possibly a bug"; + break; + } + + SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, int(ret)); + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); +} + +void Compressor::open(FileWriter& file_writer, int compression_level) { + if (nullptr != m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + + if (false == (0 <= compression_level && compression_level <= 9)) { + throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__); + } + if (compression_level != m_option.get_compression_level()) { + m_option.set_compression_level(compression_level); + } + + init_lzma_encoder(m_compression_stream); + // Setup compressed stream parameters + m_compression_stream->next_in = nullptr; + m_compression_stream->avail_in = 0; + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + + m_compressed_stream_file_writer = &file_writer; + + m_uncompressed_stream_pos = 0; +} + +void Compressor::close() { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + flush_and_close_compression_stream(); + m_compressed_stream_file_writer = nullptr; +} + +void Compressor::write(char const* data, size_t data_length) { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + if (0 == data_length) { + // Nothing needs to be done because we do not need to compress anything + return; + } + if (nullptr == data) { + throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__); + } + lzma_action action = LZMA_RUN; + m_compression_stream->next_in = reinterpret_cast(const_cast(data)); + m_compression_stream->avail_in = data_length; + + // Compress all data + bool hit_input_eof = false; + while (!hit_input_eof) { + lzma_ret return_value = lzma_code(m_compression_stream, action); + switch (return_value) { + case LZMA_OK: + case LZMA_BUF_ERROR: + break; + case LZMA_STREAM_END: + hit_input_eof = true; + break; + default: + SPDLOG_ERROR("lzma() returned an unexpected value - {}.", int(return_value)); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + + if (0 == m_compression_stream->avail_in) { + // No more data to compress + break; + } + + // Write output buffer to file if it's full + if (0 == m_compression_stream->avail_out) { + m_compressed_stream_file_writer->write( + reinterpret_cast(m_compressed_stream_block_buffer.get()), + cCompressedStreamBlockBufferSize + ); + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + } + } + + // Write any compressed data + if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) { + m_compressed_stream_file_writer->write( + reinterpret_cast(m_compressed_stream_block_buffer.get()), + cCompressedStreamBlockBufferSize - m_compression_stream->avail_out + ); + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + } + + m_compression_stream->next_in = nullptr; + + m_compression_stream_contains_data = true; + m_uncompressed_stream_pos += data_length; +} + +void Compressor::flush() { + if (false == m_compression_stream_contains_data) { + return; + } + // Z_NO_FLUSH - deflate decides how much data to accumulate before producing output + // Z_SYNC_FLUSH - All pending output flushed to output buf and output aligned to byte + // boundary (completes current block and follows it with empty block that is 3 bits plus + // filler to next byte, followed by 4 bytes Z_PARTIAL_FLUSH - Same as Z_SYNC_FLUSH but + // output not aligned to byte boundary (completes current block and follows it with empty + // fixed codes block that is 10 bits long) Z_BLOCK - Same as Z_SYNC_FLUSH but output not + // aligned on a byte boundary and up to 7 bits of current block held to be written + // Z_FULL_FLUSH - Same as Z_SYNC_FLUSH but compression state reset so that decompression can + // restart from this point if the previous compressed data has been damaged Z_FINISH - + // Pending output flushed and deflate returns Z_STREAM_END if there was enough output space, + // or Z_OK or Z_BUF_ERROR if it needs to be called again with more space + // + + bool flush_complete = false; + while (true) { + lzma_ret return_value = lzma_code(m_compression_stream, LZMA_SYNC_FLUSH); + switch (return_value) { + case LZMA_STREAM_END: + flush_complete = true; + break; + case LZMA_OK: + case LZMA_BUF_ERROR: + break; + default: + SPDLOG_ERROR("lzma() returned an unexpected value - {}.", int(return_value)); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + if (flush_complete) { + break; + } + + // Write output buffer to file if it's full + if (0 == m_compression_stream->avail_out) { + m_compressed_stream_file_writer->write( + reinterpret_cast(m_compressed_stream_block_buffer.get()), + cCompressedStreamBlockBufferSize + ); + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + } + } + + // Write any compressed data + if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) { + m_compressed_stream_file_writer->write( + reinterpret_cast(m_compressed_stream_block_buffer.get()), + cCompressedStreamBlockBufferSize - m_compression_stream->avail_out + ); + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + } + + m_compression_stream_contains_data = false; +} + +ErrorCode Compressor::try_get_pos(size_t& pos) const { + if (nullptr == m_compressed_stream_file_writer) { + return ErrorCode_NotInit; + } + + pos = m_uncompressed_stream_pos; + return ErrorCode_Success; +} + +void Compressor::flush_and_close_compression_stream() { + if (nullptr == m_compressed_stream_file_writer) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + bool flush_complete = false; + while (true) { + lzma_ret return_value = lzma_code(m_compression_stream, LZMA_FINISH); + switch (return_value) { + case LZMA_OK: + case LZMA_BUF_ERROR: + break; + case LZMA_STREAM_END: + flush_complete = true; + break; + default: + // SPDLOG_ERROR("deflate() returned an unexpected value - + // {}.", return_value); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + if (flush_complete) { + break; + } + + // Write output buffer to file if it's full + if (0 == m_compression_stream->avail_out) { + m_compressed_stream_file_writer->write( + reinterpret_cast(m_compressed_stream_block_buffer.get()), + cCompressedStreamBlockBufferSize + ); + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + } + } + + // Write any compressed data + if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) { + m_compressed_stream_file_writer->write( + reinterpret_cast(m_compressed_stream_block_buffer.get()), + cCompressedStreamBlockBufferSize - m_compression_stream->avail_out + ); + m_compression_stream->next_out = m_compressed_stream_block_buffer.get(); + m_compression_stream->avail_out = cCompressedStreamBlockBufferSize; + } + + m_compression_stream_contains_data = false; + + lzma_end(m_compression_stream); + m_compression_stream->avail_out = 0; + m_compression_stream->next_out = nullptr; +} +} // namespace clp::streaming_compression::lzma diff --git a/components/core/src/clp/streaming_compression/lzma/Compressor.hpp b/components/core/src/clp/streaming_compression/lzma/Compressor.hpp new file mode 100644 index 000000000..61a34ac40 --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Compressor.hpp @@ -0,0 +1,134 @@ +#ifndef CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP +#define CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP + +// C++ standard libraries +#include +#include + +// Compression libraries +#include +#include + +// Project headers +#include "../../FileWriter.hpp" +#include "../../TraceableException.hpp" +#include "../Compressor.hpp" +#include "Constants.hpp" + +namespace clp::streaming_compression::lzma { +class Compressor : public ::clp::streaming_compression::Compressor { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + char const* what() const noexcept override { + return "streaming_compression::lzma::Compressor operation failed"; + } + }; + + class LzmaOption { + public: + LzmaOption() + : m_compression_level{cDefaultCompressionLevel}, + m_dict_size{cDefaultDictionarySize} {} + + auto set_compression_level(int compression_level) -> void { + if (0 > compression_level) { + m_compression_level = 0; + } else if (9 < compression_level) { + m_compression_level = 9; + } else { + m_compression_level = compression_level; + } + } + + auto set_dict_size(uint32_t dict_size) -> void { m_dict_size = dict_size; } + + [[nodiscard]] auto get_compression_level() const -> int { return m_compression_level; } + + [[nodiscard]] auto get_dict_size() const -> uint32_t { return m_dict_size; } + + private: + int m_compression_level; + uint32_t m_dict_size; + }; + + // Constructor + Compressor(); + + // Destructor + ~Compressor(); + + // Explicitly disable copy and move constructor/assignment + Compressor(Compressor const&) = delete; + Compressor& operator=(Compressor const&) = delete; + + // Methods implementing the WriterInterface + /** + * Writes the given data to the compressor + * @param data + * @param data_length + */ + void write(char const* data, size_t data_length) override; + /** + * Writes any internally buffered data to file and ends the current frame + */ + void flush() override; + + /** + * Tries to get the current position of the write head + * @param pos Position of the write head + * @return ErrorCode_NotInit if the compressor is not open + * @return ErrorCode_Success on success + */ + ErrorCode try_get_pos(size_t& pos) const override; + + /** + * Closes the compressor + */ + void close() override; + + // Methods implementing the Compressor interface + /** + * Initialize streaming compressor + * @param file_writer + * @param compression_level + */ + void open(FileWriter& file_writer, int compression_level = cDefaultCompressionLevel); + + + // Methods + static auto set_compression_level(int compression_level) -> void { + m_option.set_compression_level(compression_level); + } + + static auto set_dict_size(uint32_t dict_size) -> void { m_option.set_dict_size(dict_size); } + +private: + /** + * Flushes the stream and closes it + */ + void flush_and_close_compression_stream(); + + static void init_lzma_encoder(lzma_stream* strm); + static LzmaOption m_option; + + // Variables + FileWriter* m_compressed_stream_file_writer; + + // Compressed stream variables + lzma_stream* m_compression_stream; + bool m_compression_stream_contains_data; + + std::unique_ptr m_compressed_stream_block_buffer; + + size_t m_uncompressed_stream_pos; +}; +} // namespace clp::streaming_compression::lzma + +#endif // CLP_STREAMING_COMPRESSION_LZMA_COMPRESSOR_HPP diff --git a/components/core/src/clp/streaming_compression/lzma/Constants.hpp b/components/core/src/clp/streaming_compression/lzma/Constants.hpp new file mode 100644 index 000000000..9639417da --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Constants.hpp @@ -0,0 +1,15 @@ +#ifndef STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP +#define STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP + +#include + +// C++ libraries +#include +#include + +namespace clp::streaming_compression::lzma { +constexpr int cDefaultCompressionLevel{3}; +constexpr uint32_t cDefaultDictionarySize{LZMA_DICT_SIZE_DEFAULT}; +} // namespace streaming_compression::lzma + +#endif // STREAMING_COMPRESSION_LZMA_CONSTANTS_HPP diff --git a/components/core/src/clp/streaming_compression/lzma/Decompressor.cpp b/components/core/src/clp/streaming_compression/lzma/Decompressor.cpp new file mode 100644 index 000000000..968837501 --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Decompressor.cpp @@ -0,0 +1,366 @@ +#include "Decompressor.hpp" + +// C++ Standard Libraries +#include + +// Boost libraries +#include + +// spdlog +#include + +// Compression libraries +#include +#include + +// Project headers +#include "../../Defs.h" + +namespace clp::streaming_compression::lzma { +Decompressor::Decompressor() + : ::clp::streaming_compression::Decompressor(CompressorType::LZMA), + m_input_type(InputType::NotInitialized), + m_decompression_stream(nullptr), + m_file_reader(nullptr), + m_file_reader_initial_pos(0), + m_file_read_buffer_length(0), + m_file_read_buffer_capacity(0), + m_decompressed_stream_pos(0), + m_unused_decompressed_stream_block_size(0) { + // Create block to hold unused decompressed data + m_unused_decompressed_stream_block_buffer + = std::make_unique(m_unused_decompressed_stream_block_size); + m_decompression_stream = new lzma_stream; + memset(m_decompression_stream, 0, sizeof(lzma_stream)); +} + +Decompressor::~Decompressor() { + delete m_decompression_stream; +} + +void Decompressor::exact_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) { + auto errorcode = try_read(buf, num_bytes_to_read, num_bytes_read); + if (num_bytes_read != num_bytes_to_read) { + SPDLOG_ERROR("FAILED TO READ EXACTLY {} bytes", num_bytes_to_read); + throw; + } + if (errorcode != ErrorCode_Success) { + SPDLOG_ERROR("FAILED TO READ EXACTLY {} bytes", num_bytes_to_read); + throw; + } +} + +ErrorCode Decompressor::try_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) { + if (InputType::NotInitialized == m_input_type) { + return ErrorCode_NotInit; + } + if (nullptr == buf) { + return ErrorCode_BadParam; + } + if (0 == num_bytes_to_read) { + return ErrorCode_Success; + } + + num_bytes_read = 0; + + m_decompression_stream->next_out = reinterpret_cast(buf); + m_decompression_stream->avail_out = num_bytes_to_read; + while (true) { + // Check if there's data that can be decompressed + if (0 == m_decompression_stream->avail_in) { + if (InputType::File != m_input_type) { + // if we hit here, there must be something wrong + // we have consumed all data buffer but for some reason it still requires more. + return ErrorCode_EndOfFile; + } else { + auto error_code = m_file_reader->try_read( + m_file_read_buffer.get(), + m_file_read_buffer_capacity, + m_file_read_buffer_length + ); + m_decompression_stream->avail_in = m_file_read_buffer_length; + m_decompression_stream->next_in + = reinterpret_cast(m_file_read_buffer.get()); + if (ErrorCode_Success != error_code) { + if (ErrorCode_EndOfFile == error_code) { + num_bytes_read = num_bytes_to_read - m_decompression_stream->avail_out; + m_decompressed_stream_pos += num_bytes_read; + return ErrorCode_EndOfFile; + } + } + } + } + + lzma_ret return_value = lzma_code(m_decompression_stream, LZMA_RUN); + switch (return_value) { + case LZMA_OK: + case LZMA_BUF_ERROR: + if (0 == m_decompression_stream->avail_out) { + m_decompression_stream->next_out = nullptr; + num_bytes_read = num_bytes_to_read; + m_decompressed_stream_pos += num_bytes_read; + return ErrorCode_Success; + } + // by breaking here, enter the next iteration of decompressing + break; + case LZMA_STREAM_END: + if (0 == m_decompression_stream->avail_out) { + m_decompression_stream->next_out = nullptr; + num_bytes_read = num_bytes_to_read; + m_decompressed_stream_pos += num_bytes_read; + return ErrorCode_Success; + } + SPDLOG_ERROR("streaming_compression::lzma::Decompressor wants to read more but " + "reached end of file"); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + case LZMA_MEM_ERROR: + SPDLOG_ERROR("streaming_compression::lzma::Decompressor inflate() ran out of memory" + ); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + default: + SPDLOG_ERROR("inflate() returned an unexpected value - {}.", int(return_value)); + throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__); + } + } +} + +ErrorCode Decompressor::try_seek_from_begin(size_t pos) { + if (InputType::NotInitialized == m_input_type) { + throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__); + } + + // Check if we've already decompressed passed the desired position + if (m_decompressed_stream_pos > pos) { + // ZStd has no way for us to seek back to the desired position, so just reset the stream + // to the beginning + reset_stream(); + } + + // We need to fast-forward the decompression stream to decompressed_stream_pos + ErrorCode error; + while (m_decompressed_stream_pos < pos) { + size_t num_bytes_to_decompress = std::min( + m_unused_decompressed_stream_block_size, + pos - m_decompressed_stream_pos + ); + error = try_read_exact_length( + m_unused_decompressed_stream_block_buffer.get(), + num_bytes_to_decompress + ); + if (ErrorCode_Success != error) { + return error; + } + } + + return ErrorCode_Success; +} + +ErrorCode Decompressor::try_get_pos(size_t& pos) { + if (InputType::NotInitialized == m_input_type) { + return ErrorCode_NotInit; + } + + pos = m_decompressed_stream_pos; + return ErrorCode_Success; +} + +void Decompressor::close() { + if (InputType::NotInitialized == m_input_type) { + return; + } + lzma_end(m_decompression_stream); + m_decompression_stream->avail_out = 0; + m_decompression_stream->next_out = nullptr; + if (InputType::MemoryMappedCompressedFile == m_input_type) { + if (m_memory_mapped_compressed_file.is_open()) { + // An existing file is memory mapped by the decompressor + m_memory_mapped_compressed_file.close(); + } + } else if (InputType::File == m_input_type) { + m_file_read_buffer.reset(); + m_file_read_buffer_capacity = 0; + m_file_read_buffer_length = 0; + m_file_reader = nullptr; + } + m_input_type = InputType::NotInitialized; +} + +void Decompressor::init_decoder(lzma_stream* strm) { + // Initialize a .xz decoder. The decoder supports a memory usage limit + // and a set of flags. + // + // The memory usage of the decompressor depends on the settings used + // to compress a .xz file. It can vary from less than a megabyte to + // a few gigabytes, but in practice (at least for now) it rarely + // exceeds 65 MiB because that's how much memory is required to + // decompress files created with "xz -9". Settings requiring more + // memory take extra effort to use and don't (at least for now) + // provide significantly better compression in most cases. + // + // Memory usage limit is useful if it is important that the + // decompressor won't consume gigabytes of memory. The need + // for limiting depends on the application. In this example, + // no memory usage limiting is used. This is done by setting + // the limit to UINT64_MAX. + // + // The .xz format allows concatenating compressed files as is: + // + // echo foo | xz > foobar.xz + // echo bar | xz >> foobar.xz + // + // When decompressing normal standalone .xz files, LZMA_CONCATENATED + // should always be used to support decompression of concatenated + // .xz files. If LZMA_CONCATENATED isn't used, the decoder will stop + // after the first .xz stream. This can be useful when .xz data has + // been embedded inside another file format. + // + // Flags other than LZMA_CONCATENATED are supported too, and can + // be combined with bitwise-or. See lzma/container.h + // (src/liblzma/api/lzma/container.h in the source package or e.g. + // /usr/include/lzma/container.h depending on the install prefix) + // for details. + lzma_ret ret = lzma_stream_decoder(strm, UINT64_MAX, LZMA_CONCATENATED); + + // Return successfully if the initialization went fine. + if (ret == LZMA_OK) { + return; + } + + // Something went wrong. The possible errors are documented in + // lzma/container.h (src/liblzma/api/lzma/container.h in the source + // package or e.g. /usr/include/lzma/container.h depending on the + // install prefix). + // + // Note that LZMA_MEMLIMIT_ERROR is never possible here. If you + // specify a very tiny limit, the error will be delayed until + // the first headers have been parsed by a call to lzma_code(). + char const* msg; + switch (ret) { + case LZMA_MEM_ERROR: + msg = "Memory allocation failed"; + break; + + case LZMA_OPTIONS_ERROR: + msg = "Unsupported decompressor flags"; + break; + + default: + // This is most likely LZMA_PROG_ERROR indicating a bug in + // this program or in liblzma. It is inconvenient to have a + // separate error message for errors that should be impossible + // to occur, but knowing the error code is important for + // debugging. That's why it is good to print the error code + // at least when there is no good error message to show. + msg = "Unknown error, possibly a bug"; + break; + } + + SPDLOG_ERROR("Error initializing the decoder: {} (error code {})", msg, int(ret)); +} + +void Decompressor::open(char const* compressed_data_buf, size_t compressed_data_buf_size) { + if (InputType::NotInitialized != m_input_type) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + m_input_type = InputType::CompressedDataBuf; + + // Configure input stream + reset_stream(); + m_decompression_stream->next_in + = reinterpret_cast(const_cast(compressed_data_buf)); + m_decompression_stream->avail_in = compressed_data_buf_size; + m_decompression_stream->next_out = nullptr; + m_decompression_stream->avail_out = 0; +} + +ErrorCode Decompressor::open(std::string const& compressed_file_path) { + if (InputType::NotInitialized != m_input_type) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + m_input_type = InputType::MemoryMappedCompressedFile; + + // Create memory mapping for compressed_file_path, use boost read only memory mapped file + boost::system::error_code boost_error_code; + size_t compressed_file_size + = boost::filesystem::file_size(compressed_file_path, boost_error_code); + if (boost_error_code) { + SPDLOG_ERROR( + "streaming_compression::zstd::Decompressor: Unable to obtain file size for " + "'{}' - {}.", + compressed_file_path.c_str(), + boost_error_code.message().c_str() + ); + return ErrorCode_Failure; + } + + boost::iostreams::mapped_file_params memory_map_params; + memory_map_params.path = compressed_file_path; + memory_map_params.flags = boost::iostreams::mapped_file::readonly; + memory_map_params.length = compressed_file_size; + memory_map_params.hint = m_memory_mapped_compressed_file.data( + ); // Try to map it to the same memory location as previous memory mapped file + m_memory_mapped_compressed_file.open(memory_map_params); + if (!m_memory_mapped_compressed_file.is_open()) { + SPDLOG_ERROR( + "streaming_compression::lzma::Decompressor: Unable to memory map the " + "compressed file with path: {}", + compressed_file_path.c_str() + ); + return ErrorCode_Failure; + } + + // Configure input stream + reset_stream(); + m_decompression_stream->next_in + = reinterpret_cast(const_cast(m_memory_mapped_compressed_file.data())); + m_decompression_stream->avail_in = compressed_file_size; + m_decompression_stream->next_out = nullptr; + m_decompression_stream->avail_out = 0; + + return ErrorCode_Success; +} + +void Decompressor::open(FileReader& file_reader, size_t file_read_buffer_capacity) { + if (InputType::NotInitialized != m_input_type) { + throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__); + } + m_input_type = InputType::File; + + m_file_reader = &file_reader; + m_file_reader_initial_pos = m_file_reader->get_pos(); + + m_file_read_buffer_capacity = file_read_buffer_capacity; + m_file_read_buffer = std::make_unique(m_file_read_buffer_capacity); + m_file_read_buffer_length = 0; + + // Configure input stream + reset_stream(); + m_decompression_stream->next_in = reinterpret_cast(m_file_read_buffer.get()); + m_decompression_stream->avail_in = m_file_read_buffer_length; + m_decompression_stream->next_out = nullptr; + m_decompression_stream->avail_out = 0; +} + +ErrorCode Decompressor::get_decompressed_stream_region( + size_t decompressed_stream_pos, + char* extraction_buf, + size_t extraction_len +) { + auto error_code = try_seek_from_begin(decompressed_stream_pos); + if (ErrorCode_Success != error_code) { + return error_code; + } + + error_code = try_read_exact_length(extraction_buf, extraction_len); + return error_code; +} + +void Decompressor::reset_stream() { + if (InputType::File == m_input_type) { + m_file_reader->seek_from_begin(m_file_reader_initial_pos); + m_file_read_buffer_length = 0; + } + m_decompressed_stream_pos = 0; + init_decoder(m_decompression_stream); +} +} // namespace clp::streaming_compression::lzma diff --git a/components/core/src/clp/streaming_compression/lzma/Decompressor.hpp b/components/core/src/clp/streaming_compression/lzma/Decompressor.hpp new file mode 100644 index 000000000..22d13c957 --- /dev/null +++ b/components/core/src/clp/streaming_compression/lzma/Decompressor.hpp @@ -0,0 +1,163 @@ +#ifndef CLP_STREAMING_COMPRESSION_LZMA_DECOMPRESSOR_HPP +#define CLP_STREAMING_COMPRESSION_LZMA_DECOMPRESSOR_HPP + +// C++ standard libraries +#include +#include + +// Compression libraries +#include +#include + +// Boost libraries +#include + +// Project headers +#include "../../FileReader.hpp" +#include "../../TraceableException.hpp" +#include "../Decompressor.hpp" + +namespace clp::streaming_compression::lzma { +class Decompressor : public ::clp::streaming_compression::Decompressor { +public: + // Types + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + + // Methods + char const* what() const noexcept override { + return "streaming_compression::lzma::Decompressor operation failed"; + } + }; + + // Constructor + Decompressor(); + + // Destructor + ~Decompressor(); + + // Explicitly disable copy and move constructor/assignment + Decompressor(Decompressor const&) = delete; + Decompressor& operator=(Decompressor const&) = delete; + + // Methods implementing the ReaderInterface + /** + * Tries to read up to a given number of bytes from the decompressor + * @param buf + * @param num_bytes_to_read The number of bytes to try and read + * @param num_bytes_read The actual number of bytes read + * @return Same as FileReader::try_read if the decompressor is attached to a file + * @return ErrorCode_NotInit if the decompressor is not open + * @return ErrorCode_BadParam if buf is invalid + * @return ErrorCode_EndOfFile on EOF + * @return ErrorCode_Failure on decompression failure + * @return ErrorCode_Success on success + */ + ErrorCode try_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read) override; + + /** + */ + void exact_read(char* buf, size_t num_bytes_to_read, size_t& num_bytes_read); + + /** + * Tries to seek from the beginning to the given position + * @param pos + * @return ErrorCode_NotInit if the decompressor is not open + * @return Same as ReaderInterface::try_read_exact_length + * @return ErrorCode_Success on success + */ + ErrorCode try_seek_from_begin(size_t pos) override; + /** + * Tries to get the current position of the read head + * @param pos Position of the read head in the file + * @return ErrorCode_NotInit if the decompressor is not open + * @return ErrorCode_Success on success + */ + ErrorCode try_get_pos(size_t& pos) override; + + // Methods implementing the Decompressor interface + void close() override; + /** + * Decompresses and copies the range of uncompressed data described by + * decompressed_stream_pos and extraction_len into extraction_buf + * @param decompressed_stream_pos + * @param extraction_buf + * @param extraction_len + * @return Same as streaming_compression::zstd::Decompressor::try_seek_from_begin + * @return Same as ReaderInterface::try_read_exact_length + */ + ErrorCode get_decompressed_stream_region( + size_t decompressed_stream_pos, + char* extraction_buf, + size_t extraction_len + ) override; + + // Methods + /*** + * Initialize streaming decompressor to decompress from the specified compressed data buffer + * @param compressed_data_buf + * @param compressed_data_buf_size + */ + void open(char const* compressed_data_buf, size_t compressed_data_buf_size) override; + + /*** + * Initialize streaming decompressor to decompress from a compressed file specified by the + * given path + * @param compressed_file_path + * @param decompressed_stream_block_size + * @return ErrorCode_Failure if the provided path cannot be memory mapped + * @return ErrorCode_Success on success + */ + ErrorCode open(std::string const& compressed_file_path); + + /** + * Initializes the decompressor to decompress from an open file + * @param file_reader + * @param file_read_buffer_capacity The maximum amount of data to read from a file at a time + */ + void open(FileReader& file_reader, size_t file_read_buffer_capacity) override; + +private: + // Enum class + enum class InputType { + NotInitialized, // Note: do nothing but generate an error to prevent this required + // parameter is not initialized properly + CompressedDataBuf, + MemoryMappedCompressedFile, + File + }; + + // Methods + /** + * Reset streaming decompression state so it will start decompressing from the beginning of + * the stream afterwards + */ + void reset_stream(); + + void init_decoder(lzma_stream* strm); + + // Variables + InputType m_input_type; + + // Compressed stream variables + lzma_stream* m_decompression_stream{nullptr}; + + boost::iostreams::mapped_file_source m_memory_mapped_compressed_file; + FileReader* m_file_reader; + size_t m_file_reader_initial_pos; + std::unique_ptr m_file_read_buffer; + size_t m_file_read_buffer_length; + size_t m_file_read_buffer_capacity; + + size_t m_decompressed_stream_pos; + size_t m_unused_decompressed_stream_block_size; + std::unique_ptr m_unused_decompressed_stream_block_buffer; + + char const* m_compressed_stream_block; + size_t m_compressed_stream_block_size; +}; +} // namespace clp::streaming_compression::lzma +#endif // CLP_STREAMING_COMPRESSION_LZMA_DECOMPRESSOR_HPP diff --git a/components/core/tests/test-StreamingCompression.cpp b/components/core/tests/test-StreamingCompression.cpp index b43316b3f..e659f3e9b 100644 --- a/components/core/tests/test-StreamingCompression.cpp +++ b/components/core/tests/test-StreamingCompression.cpp @@ -5,6 +5,8 @@ #include #include +#include "../src/clp/streaming_compression/lzma/Compressor.hpp" +#include "../src/clp/streaming_compression/lzma/Decompressor.hpp" #include "../src/clp/streaming_compression/passthrough/Compressor.hpp" #include "../src/clp/streaming_compression/passthrough/Decompressor.hpp" #include "../src/clp/streaming_compression/zstd/Compressor.hpp"