Skip to content

Commit

Permalink
Refactor lzma compressor to group common functionalities into helplers
Browse files Browse the repository at this point in the history
  • Loading branch information
Bill-hbrhbr committed Nov 26, 2024
1 parent d5af274 commit b94ca26
Show file tree
Hide file tree
Showing 2 changed files with 84 additions and 182 deletions.
210 changes: 70 additions & 140 deletions components/core/src/clp/streaming_compression/lzma/Compressor.cpp
Original file line number Diff line number Diff line change
@@ -1,38 +1,40 @@
#include "Compressor.hpp"

#include <spdlog/spdlog.h>
#include <array>
#include <cstddef>
#include <cstdint>
#include <cstring>

// Compression libraries
#include <lzma.h>
#include <zlib.h>
#include <spdlog/spdlog.h>

// Project headers
#include "../../Defs.h"
#include "../../ErrorCode.hpp"
#include "../../FileWriter.hpp"
#include "../../TraceableException.hpp"
#include "../../type_utils.hpp"
#include "Constants.hpp"

namespace clp::streaming_compression::lzma {
Compressor::LzmaOption Compressor::m_option;

Compressor::Compressor() {
memset(m_compression_stream.get(), 0, sizeof(LzmaStream));
}
using clp::size_checked_pointer_cast;

void Compressor::init_lzma_encoder(LzmaStream* strm) {
lzma_options_lzma options;
if (lzma_lzma_preset(&options, m_option.get_compression_level())) {
auto Compressor::init_lzma_encoder(LzmaStream* strm, int compression_level, size_t dict_size)
-> void {
LzmaOptionsLzma options;
if (0 != lzma_lzma_preset(&options, compression_level)) {
SPDLOG_ERROR("Failed to initialize LZMA options.");
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
options.dict_size = m_option.get_dict_size();
lzma_filter filters[2]{
{LZMA_FILTER_LZMA2, &options},
{LZMA_VLI_UNKNOWN, nullptr},
};
options.dict_size = dict_size;
std::array<LzmaFilter, 2> filters{{
{.id = LZMA_FILTER_LZMA2, .options = &options},
{.id = LZMA_VLI_UNKNOWN, .options = nullptr},
}};

// Initialize the encoder using a preset. Set the integrity to check
// to CRC64, which is the default in the xz command line tool. If
// the .xz file needs to be decompressed with XZ Embedded, use
// LZMA_CHECK_CRC32 instead.
auto const ret = lzma_stream_encoder(strm, filters, LZMA_CHECK_CRC64);
auto const ret{lzma_stream_encoder(strm, filters.data(), LZMA_CHECK_CRC64)};

// Return successfully if the initialization went fine.
if (LZMA_OK == ret) {
Expand All @@ -43,7 +45,7 @@ void Compressor::init_lzma_encoder(LzmaStream* strm) {
// lzma/container.h (src/liblzma/api/lzma/container.h in the source
// package or e.g. /usr/include/lzma/container.h depending on the
// install prefix).
char const* msg;
char const* msg{nullptr};
switch (ret) {
case LZMA_MEM_ERROR:
msg = "Memory allocation failed";
Expand All @@ -68,23 +70,21 @@ void Compressor::init_lzma_encoder(LzmaStream* strm) {
break;
}

SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, int(ret));
SPDLOG_ERROR("Error initializing the encoder: {} (error code {})", msg, static_cast<int>(ret));
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}

void Compressor::open(FileWriter& file_writer, int compression_level) {
auto Compressor::open(FileWriter& file_writer, int compression_level) -> void {
if (nullptr != m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotReady, __FILENAME__, __LINE__);
}

if (false == (0 <= compression_level && compression_level <= 9)) {
if (compression_level < cMinCompressionLevel || compression_level > cMaxCompressionLevel) {
throw OperationFailed(ErrorCode_Unsupported, __FILENAME__, __LINE__);
}
if (compression_level != m_option.get_compression_level()) {
m_option.set_compression_level(compression_level);
}

init_lzma_encoder(m_compression_stream.get());
memset(m_compression_stream.get(), 0, sizeof(LzmaStream));
init_lzma_encoder(m_compression_stream.get(), compression_level, m_dict_size);
// Setup compressed stream parameters
m_compression_stream->next_in = nullptr;
m_compression_stream->avail_in = 0;
Expand All @@ -96,7 +96,7 @@ void Compressor::open(FileWriter& file_writer, int compression_level) {
m_uncompressed_stream_pos = 0;
}

void Compressor::close() {
auto Compressor::close() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
Expand All @@ -105,7 +105,7 @@ void Compressor::close() {
m_compressed_stream_file_writer = nullptr;
}

void Compressor::write(char const* data, size_t data_length) {
auto Compressor::write(char const* data, size_t data_length) -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}
Expand All @@ -114,62 +114,23 @@ void Compressor::write(char const* data, size_t data_length) {
// Nothing needs to be done because we do not need to compress anything
return;
}

if (nullptr == data) {
throw OperationFailed(ErrorCode_BadParam, __FILENAME__, __LINE__);
}
lzma_action action = LZMA_RUN;
m_compression_stream->next_in = reinterpret_cast<Bytef*>(const_cast<char*>(data));
m_compression_stream->avail_in = data_length;

// Compress all data
bool hit_input_eof = false;
while (!hit_input_eof) {
auto const return_value = lzma_code(m_compression_stream.get(), action);
switch (return_value) {
case LZMA_OK:
case LZMA_BUF_ERROR:
break;
case LZMA_STREAM_END:
hit_input_eof = true;
break;
default:
SPDLOG_ERROR("lzma() returned an unexpected value - {}.", int(return_value));
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}

if (0 == m_compression_stream->avail_in) {
// No more data to compress
break;
}
m_compression_stream->next_in = size_checked_pointer_cast<uint8_t const>(data);
m_compression_stream->avail_in = data_length;

// Write output buffer to file if it's full
if (0 == m_compression_stream->avail_out) {
m_compressed_stream_file_writer->write(
reinterpret_cast<char*>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
}
}

// Write any compressed data
if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) {
m_compressed_stream_file_writer->write(
reinterpret_cast<char*>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream->avail_out
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
}
run_lzma(LZMA_RUN);

m_compression_stream->next_in = nullptr;

m_compression_stream_contains_data = true;
m_uncompressed_stream_pos += data_length;
}

void Compressor::flush() {
auto Compressor::flush() -> void {
if (false == m_compression_stream_contains_data) {
return;
}
Expand All @@ -184,51 +145,11 @@ void Compressor::flush() {
// restart from this point if the previous compressed data has been damaged Z_FINISH -
// Pending output flushed and deflate returns Z_STREAM_END if there was enough output space,
// or Z_OK or Z_BUF_ERROR if it needs to be called again with more space
//

bool flush_complete = false;
while (true) {
auto const return_value = lzma_code(m_compression_stream.get(), LZMA_SYNC_FLUSH);
switch (return_value) {
case LZMA_STREAM_END:
flush_complete = true;
break;
case LZMA_OK:
case LZMA_BUF_ERROR:
break;
default:
SPDLOG_ERROR("lzma() returned an unexpected value - {}.", int(return_value));
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
if (flush_complete) {
break;
}

// Write output buffer to file if it's full
if (0 == m_compression_stream->avail_out) {
m_compressed_stream_file_writer->write(
reinterpret_cast<char*>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
}
}

// Write any compressed data
if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) {
m_compressed_stream_file_writer->write(
reinterpret_cast<char*>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream->avail_out
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
}

run_lzma(LZMA_SYNC_FLUSH);
m_compression_stream_contains_data = false;
}

ErrorCode Compressor::try_get_pos(size_t& pos) const {
auto Compressor::try_get_pos(size_t& pos) const -> ErrorCode {
if (nullptr == m_compressed_stream_file_writer) {
return ErrorCode_NotInit;
}
Expand All @@ -237,55 +158,64 @@ ErrorCode Compressor::try_get_pos(size_t& pos) const {
return ErrorCode_Success;
}

void Compressor::flush_and_close_compression_stream() {
auto Compressor::flush_and_close_compression_stream() -> void {
if (nullptr == m_compressed_stream_file_writer) {
throw OperationFailed(ErrorCode_NotInit, __FILENAME__, __LINE__);
}

bool flush_complete = false;
run_lzma(LZMA_FINISH);

m_compression_stream_contains_data = false;

lzma_end(m_compression_stream.get());
m_compression_stream->avail_out = 0;
m_compression_stream->next_out = nullptr;
}

auto Compressor::run_lzma(LzmaAction action) -> void {
// Compress all data
bool hit_input_eof{false};
while (true) {
lzma_ret return_value = lzma_code(m_compression_stream.get(), LZMA_FINISH);
switch (return_value) {
auto const rc = lzma_code(m_compression_stream.get(), action);
switch (rc) {
case LZMA_OK:
case LZMA_BUF_ERROR:
break;
case LZMA_STREAM_END:
flush_complete = true;
hit_input_eof = true;
break;
default:
// SPDLOG_ERROR("deflate() returned an unexpected value -
// {}.", return_value);
SPDLOG_ERROR("lzma() returned an unexpected value - {}.", static_cast<int>(rc));
throw OperationFailed(ErrorCode_Failure, __FILENAME__, __LINE__);
}
if (flush_complete) {

if (LZMA_RUN == action && 0 == m_compression_stream->avail_in) {
// No more data to compress
break;
}

if (hit_input_eof) {
break;
}

// Write output buffer to file if it's full
if (0 == m_compression_stream->avail_out) {
m_compressed_stream_file_writer->write(
reinterpret_cast<char*>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
write_data();
}
}

// Write any compressed data
if (m_compression_stream->avail_out < cCompressedStreamBlockBufferSize) {
m_compressed_stream_file_writer->write(
reinterpret_cast<char*>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream->avail_out
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
write_data();
}
}

m_compression_stream_contains_data = false;

lzma_end(m_compression_stream.get());
m_compression_stream->avail_out = 0;
m_compression_stream->next_out = nullptr;
auto Compressor::write_data() -> void {
m_compressed_stream_file_writer->write(
size_checked_pointer_cast<char>(m_compressed_stream_block_buffer.data()),
cCompressedStreamBlockBufferSize - m_compression_stream->avail_out
);
m_compression_stream->next_out = m_compressed_stream_block_buffer.data();
m_compression_stream->avail_out = cCompressedStreamBlockBufferSize;
}
} // namespace clp::streaming_compression::lzma
Loading

0 comments on commit b94ca26

Please sign in to comment.