From cd6c0b861eeb7a098b9f5966a2f80e23717c1fb8 Mon Sep 17 00:00:00 2001 From: gibber9809 Date: Tue, 23 Jul 2024 23:27:57 +0000 Subject: [PATCH] Add back decompression side of table packing This reverts commit 794a7327da1a0905bc2b3b0f2bff34259d7bc78b. --- components/core/src/clp_s/ArchiveReader.cpp | 114 +++++++++++++------- components/core/src/clp_s/ArchiveReader.hpp | 24 ++++- components/core/src/clp_s/CMakeLists.txt | 2 + components/core/src/clp_s/SchemaReader.cpp | 18 ++-- components/core/src/clp_s/SchemaReader.hpp | 19 ++-- components/core/src/clp_s/TableReader.cpp | 108 +++++++++++++++++++ components/core/src/clp_s/TableReader.hpp | 89 +++++++++++++++ components/core/src/clp_s/search/Output.cpp | 2 +- 8 files changed, 312 insertions(+), 64 deletions(-) create mode 100644 components/core/src/clp_s/TableReader.cpp create mode 100644 components/core/src/clp_s/TableReader.hpp diff --git a/components/core/src/clp_s/ArchiveReader.cpp b/components/core/src/clp_s/ArchiveReader.cpp index f211a0707..3c62cc63b 100644 --- a/components/core/src/clp_s/ArchiveReader.cpp +++ b/components/core/src/clp_s/ArchiveReader.cpp @@ -27,8 +27,8 @@ void ArchiveReader::open(string_view archives_dir, string_view archive_id) { m_schema_tree = ReaderUtils::read_schema_tree(archive_path_str); m_schema_map = ReaderUtils::read_schemas(archive_path_str); - m_tables_file_reader.open(archive_path_str + constants::cArchiveTablesFile); m_table_metadata_file_reader.open(archive_path_str + constants::cArchiveTableMetadataFile); + m_table_reader.open_tables(archive_path_str + constants::cArchiveTablesFile); } void ArchiveReader::read_metadata() { @@ -38,6 +38,8 @@ void ArchiveReader::read_metadata() { cDecompressorFileReadBufferCapacity ); + m_table_reader.read_metadata(m_table_metadata_decompressor); + size_t num_schemas; if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_schemas); ErrorCodeSuccess != error) @@ -45,39 +47,65 @@ void ArchiveReader::read_metadata() { throw OperationFailed(error, __FILENAME__, __LINE__); } - for (size_t i = 0; i < num_schemas; i++) { - int32_t schema_id; - uint64_t num_messages; + bool prev_metadata_initialized{false}; + SchemaReader::SchemaMetadata prev_metadata{}; + int32_t prev_schema_id{}; + for (size_t i = 0; i < num_schemas; ++i) { + size_t table_id; size_t table_offset; - size_t uncompressed_size; + int32_t schema_id; + size_t num_messages; - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_id); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(table_offset); + if (table_offset > m_table_reader.get_uncompressed_table_size(table_id)) { + throw OperationFailed(ErrorCodeCorrupt, __FILENAME__, __LINE__); + } + + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(schema_id); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - if (auto error = m_table_metadata_decompressor.try_read_numeric_value(uncompressed_size); + if (auto error = m_table_metadata_decompressor.try_read_numeric_value(num_messages); ErrorCodeSuccess != error) { throw OperationFailed(error, __FILENAME__, __LINE__); } - m_id_to_table_metadata[schema_id] = {num_messages, table_offset, uncompressed_size}; + if (prev_metadata_initialized) { + size_t uncompressed_size{0}; + if (table_id != prev_metadata.table_id) { + uncompressed_size + = m_table_reader.get_uncompressed_table_size(prev_metadata.table_id) + - prev_metadata.table_offset; + } else { + uncompressed_size = table_offset - prev_metadata.table_offset; + } + prev_metadata.uncompressed_size = uncompressed_size; + m_id_to_schema_metadata[prev_schema_id] = prev_metadata; + } else { + prev_metadata_initialized = true; + } + prev_metadata = {table_id, table_offset, num_messages, 0}; + prev_schema_id = schema_id; m_schema_ids.push_back(schema_id); } + prev_metadata.uncompressed_size + = m_table_reader.get_uncompressed_table_size(prev_metadata.table_id) + - prev_metadata.table_offset; + m_id_to_schema_metadata[prev_schema_id] = prev_metadata; m_table_metadata_decompressor.close(); } @@ -89,14 +117,12 @@ void ArchiveReader::read_dictionaries_and_metadata() { read_metadata(); } -SchemaReader& ArchiveReader::read_table( +SchemaReader& ArchiveReader::read_schema_table( int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records ) { - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - - if (m_id_to_table_metadata.count(schema_id) == 0) { + if (m_id_to_schema_metadata.count(schema_id) == 0) { throw OperationFailed(ErrorCodeFileNotFound, __FILENAME__, __LINE__); } @@ -107,30 +133,26 @@ SchemaReader& ArchiveReader::read_table( should_marshal_records ); - m_tables_file_reader.try_seek_from_begin(m_id_to_table_metadata[schema_id].offset); - m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); - m_schema_reader.load( - m_tables_decompressor, - m_id_to_table_metadata[schema_id].uncompressed_size - ); - m_tables_decompressor.close_for_reuse(); + auto& schema_metadata = m_id_to_schema_metadata[schema_id]; + auto table_buffer = read_table(schema_metadata.table_id, true); + m_schema_reader + .load(table_buffer, schema_metadata.table_offset, schema_metadata.uncompressed_size); return m_schema_reader; } std::vector> ArchiveReader::read_all_tables() { - constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB - std::vector> readers; - readers.reserve(m_id_to_table_metadata.size()); - for (auto const& [id, table_metadata] : m_id_to_table_metadata) { + readers.reserve(m_id_to_schema_metadata.size()); + for (auto schema_id : m_schema_ids) { auto schema_reader = std::make_shared(); - initialize_schema_reader(*schema_reader, id, true, true); - - m_tables_file_reader.try_seek_from_begin(table_metadata.offset); - m_tables_decompressor.open(m_tables_file_reader, cDecompressorFileReadBufferCapacity); - schema_reader->load(m_tables_decompressor, table_metadata.uncompressed_size); - m_tables_decompressor.close_for_reuse(); - + initialize_schema_reader(*schema_reader, schema_id, true, true); + auto& schema_metadata = m_id_to_schema_metadata[schema_id]; + auto table_buffer = read_table(schema_metadata.table_id, false); + schema_reader->load( + table_buffer, + schema_metadata.table_offset, + schema_metadata.uncompressed_size + ); readers.push_back(std::move(schema_reader)); } return readers; @@ -237,7 +259,7 @@ void ArchiveReader::initialize_schema_reader( m_schema_tree, schema_id, schema.get_ordered_schema_view(), - m_id_to_table_metadata[schema_id].num_messages, + m_id_to_schema_metadata[schema_id].num_messages, should_marshal_records ); auto timestamp_column_ids = m_timestamp_dict->get_authoritative_timestamp_column_ids(); @@ -284,9 +306,8 @@ void ArchiveReader::initialize_schema_reader( void ArchiveReader::store(FileWriter& writer) { std::string message; - - for (auto& [id, table_metadata] : m_id_to_table_metadata) { - auto& schema_reader = read_table(id, false, true); + for (auto schema_id : m_schema_ids) { + auto& schema_reader = read_schema_table(schema_id, false, true); while (schema_reader.get_next_message(message)) { writer.write(message.c_str(), message.length()); } @@ -304,11 +325,28 @@ void ArchiveReader::close() { m_array_dict->close(); m_timestamp_dict->close(); - m_tables_file_reader.close(); + m_table_reader.close(); m_table_metadata_file_reader.close(); - m_id_to_table_metadata.clear(); + m_id_to_schema_metadata.clear(); m_schema_ids.clear(); + m_cur_table_id = 0; + m_table_buffer.reset(); + m_table_buffer_size = 0ULL; } +std::shared_ptr ArchiveReader::read_table(size_t table_id, bool reuse_buffer) { + if (nullptr != m_table_buffer && m_cur_table_id == table_id) { + return m_table_buffer; + } + + if (false == reuse_buffer) { + m_table_buffer.reset(); + m_table_buffer_size = 0; + } + + m_table_reader.read_table(table_id, m_table_buffer, m_table_buffer_size); + m_cur_table_id = table_id; + return m_table_buffer; +} } // namespace clp_s diff --git a/components/core/src/clp_s/ArchiveReader.hpp b/components/core/src/clp_s/ArchiveReader.hpp index 91fcc1a94..02755592a 100644 --- a/components/core/src/clp_s/ArchiveReader.hpp +++ b/components/core/src/clp_s/ArchiveReader.hpp @@ -12,6 +12,7 @@ #include "DictionaryReader.hpp" #include "ReaderUtils.hpp" #include "SchemaReader.hpp" +#include "TableReader.hpp" #include "TimestampDictionaryReader.hpp" #include "Utils.hpp" @@ -91,8 +92,11 @@ class ArchiveReader { * @param should_marshal_records * @return the schema reader */ - SchemaReader& - read_table(int32_t schema_id, bool should_extract_timestamp, bool should_marshal_records); + SchemaReader& read_schema_table( + int32_t schema_id, + bool should_extract_timestamp, + bool should_marshal_records + ); /** * Loads all of the tables in the archive and returns SchemaReaders for them. @@ -171,6 +175,14 @@ class ArchiveReader { bool should_marshal_records ); + /** + * Reads a table with given ID from the table reader. If read_table is called multiple times in + * a row for the same table_id a cached buffer is returned. This function allows the caller to + * ask for the same buffer to be reused to read multiple different tables: this can save memory + * allocations, but can only be used when tables are read one at a time. + */ + std::shared_ptr read_table(size_t table_id, bool reuse_buffer); + bool m_is_open; std::string m_archive_id; std::shared_ptr m_var_dict; @@ -181,13 +193,15 @@ class ArchiveReader { std::shared_ptr m_schema_tree; std::shared_ptr m_schema_map; std::vector m_schema_ids; - std::map m_id_to_table_metadata; + std::map m_id_to_schema_metadata; - FileReader m_tables_file_reader; + TableReader m_table_reader; FileReader m_table_metadata_file_reader; - ZstdDecompressor m_tables_decompressor; ZstdDecompressor m_table_metadata_decompressor; SchemaReader m_schema_reader; + std::shared_ptr m_table_buffer{}; + size_t m_table_buffer_size{0ULL}; + size_t m_cur_table_id{0ULL}; }; } // namespace clp_s diff --git a/components/core/src/clp_s/CMakeLists.txt b/components/core/src/clp_s/CMakeLists.txt index c8cf08b22..cc1fd78cc 100644 --- a/components/core/src/clp_s/CMakeLists.txt +++ b/components/core/src/clp_s/CMakeLists.txt @@ -78,6 +78,8 @@ set( SchemaTree.hpp SchemaWriter.cpp SchemaWriter.hpp + TableReader.cpp + TableReader.hpp TimestampDictionaryReader.cpp TimestampDictionaryReader.hpp TimestampDictionaryWriter.cpp diff --git a/components/core/src/clp_s/SchemaReader.cpp b/components/core/src/clp_s/SchemaReader.cpp index 03edebf69..265e772d8 100644 --- a/components/core/src/clp_s/SchemaReader.cpp +++ b/components/core/src/clp_s/SchemaReader.cpp @@ -37,17 +37,13 @@ void SchemaReader::mark_column_as_timestamp(BaseColumnReader* column_reader) { } } -void SchemaReader::load(ZstdDecompressor& decompressor, size_t uncompressed_size) { - if (uncompressed_size > m_table_buffer_size) { - m_table_buffer = std::make_unique(uncompressed_size); - m_table_buffer_size = uncompressed_size; - } - auto error = decompressor.try_read_exact_length(m_table_buffer.get(), uncompressed_size); - if (ErrorCodeSuccess != error) { - throw OperationFailed(error, __FILENAME__, __LINE__); - } - - BufferViewReader buffer_reader{m_table_buffer.get(), uncompressed_size}; +void SchemaReader::load( + std::shared_ptr table_buffer, + size_t offset, + size_t uncompressed_size +) { + m_table_buffer = table_buffer; + BufferViewReader buffer_reader{m_table_buffer.get() + offset, uncompressed_size}; for (auto& reader : m_columns) { reader->load(buffer_reader, m_num_messages); } diff --git a/components/core/src/clp_s/SchemaReader.hpp b/components/core/src/clp_s/SchemaReader.hpp index 3639560f6..300bc47c8 100644 --- a/components/core/src/clp_s/SchemaReader.hpp +++ b/components/core/src/clp_s/SchemaReader.hpp @@ -1,6 +1,7 @@ #ifndef CLP_S_SCHEMAREADER_HPP #define CLP_S_SCHEMAREADER_HPP +#include #include #include #include @@ -11,7 +12,6 @@ #include "FileReader.hpp" #include "JsonSerializer.hpp" #include "SchemaTree.hpp" -#include "ZstdDecompressor.hpp" namespace clp_s { class SchemaReader; @@ -47,9 +47,10 @@ class SchemaReader { : TraceableException(error_code, filename, line_number) {} }; - struct TableMetadata { - uint64_t num_messages; - size_t offset; + struct SchemaMetadata { + size_t table_id; + size_t table_offset; + size_t num_messages; size_t uncompressed_size; }; @@ -130,11 +131,12 @@ class SchemaReader { ); /** - * Loads the encoded messages - * @param decompressor + * Loads the encoded messages from a shared buffer starting at a given offset + * @param table_buffer + * @param offset * @param uncompressed_size */ - void load(ZstdDecompressor& decompressor, size_t uncompressed_size); + void load(std::shared_ptr table_buffer, size_t offset, size_t uncompressed_size); /** * Gets next message @@ -277,8 +279,7 @@ class SchemaReader { std::unordered_map m_column_map; std::vector m_columns; std::vector m_reordered_columns; - std::unique_ptr m_table_buffer; - size_t m_table_buffer_size{0}; + std::shared_ptr m_table_buffer; BaseColumnReader* m_timestamp_column; std::function m_get_timestamp; diff --git a/components/core/src/clp_s/TableReader.cpp b/components/core/src/clp_s/TableReader.cpp new file mode 100644 index 000000000..969824356 --- /dev/null +++ b/components/core/src/clp_s/TableReader.cpp @@ -0,0 +1,108 @@ +#include "TableReader.hpp" + +namespace clp_s { + +void TableReader::read_metadata(ZstdDecompressor& decompressor) { + switch (m_state) { + case TableReaderState::Uninitialized: + m_state = TableReaderState::MetadataRead; + break; + case TableReaderState::TablesOpened: + m_state = TableReaderState::TablesOpenedAndMetadataRead; + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + + size_t num_tables; + if (auto error = decompressor.try_read_numeric_value(num_tables); ErrorCodeSuccess != error) { + throw OperationFailed(error, __FILE__, __LINE__); + } + m_table_metadata.reserve(num_tables); + + for (size_t i = 0; i < num_tables; ++i) { + size_t file_offset; + size_t uncompressed_size; + + if (auto error = decompressor.try_read_numeric_value(file_offset); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + + if (auto error = decompressor.try_read_numeric_value(uncompressed_size); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + + m_table_metadata.emplace_back(file_offset, uncompressed_size); + } +} + +void TableReader::open_tables(std::string const& tables_file_path) { + switch (m_state) { + case TableReaderState::Uninitialized: + m_state = TableReaderState::TablesOpened; + break; + case TableReaderState::MetadataRead: + m_state = TableReaderState::TablesOpenedAndMetadataRead; + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + m_tables_reader.open(tables_file_path); +} + +void TableReader::close() { + switch (m_state) { + case TableReaderState::TablesOpened: + case TableReaderState::TablesOpenedAndMetadataRead: + case TableReaderState::ReadingTables: + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + m_tables_reader.close(); + m_previous_table_id = 0; + m_table_metadata.clear(); + m_state = TableReaderState::Uninitialized; +} + +void TableReader::read_table(size_t table_id, std::shared_ptr& buf, size_t& buf_size) { + constexpr size_t cDecompressorFileReadBufferCapacity = 64 * 1024; // 64 KB + if (table_id > m_table_metadata.size()) { + throw OperationFailed(ErrorCodeCorrupt, __FILE__, __LINE__); + } + + switch (m_state) { + case TableReaderState::TablesOpenedAndMetadataRead: + m_state = TableReaderState::ReadingTables; + break; + case TableReaderState::ReadingTables: + if (m_previous_table_id >= table_id) { + throw OperationFailed(ErrorCodeBadParam, __FILE__, __LINE__); + } + break; + default: + throw OperationFailed(ErrorCodeNotReady, __FILE__, __LINE__); + } + m_previous_table_id = table_id; + + auto& [file_offset, uncompressed_size] = m_table_metadata[table_id]; + m_tables_reader.try_seek_from_begin(file_offset); + m_tables_decompressor.open(m_tables_reader, cDecompressorFileReadBufferCapacity); + if (buf_size < uncompressed_size) { + // make_shared is supposed to work here for c++20, but it seems like the compiler version + // we use doesn't support it, so we convert a unique_ptr to a shared_ptr instead. + buf = std::make_unique(uncompressed_size); + buf_size = uncompressed_size; + } + if (auto error = m_tables_decompressor.try_read_exact_length(buf.get(), uncompressed_size); + ErrorCodeSuccess != error) + { + throw OperationFailed(error, __FILE__, __LINE__); + } + m_tables_decompressor.close_for_reuse(); +} +} // namespace clp_s diff --git a/components/core/src/clp_s/TableReader.hpp b/components/core/src/clp_s/TableReader.hpp new file mode 100644 index 000000000..116d1b957 --- /dev/null +++ b/components/core/src/clp_s/TableReader.hpp @@ -0,0 +1,89 @@ +#ifndef CLP_S_TABLEREADER_HPP +#define CLP_S_TABLEREADER_HPP + +#include +#include +#include +#include + +#include "FileReader.hpp" +#include "ZstdDecompressor.hpp" + +namespace clp_s { +/** + * TableReader ensures that the tables section of an archive is read safely. Any attempt to read the + * tables section without loading the tables metadata, and any attempt to read tables section out of + * order will throw. As well, any incorrect usage of this class (e.g. closing without opening) will + * throw. + */ +class TableReader { +public: + class OperationFailed : public TraceableException { + public: + // Constructors + OperationFailed(ErrorCode error_code, char const* const filename, int line_number) + : TraceableException(error_code, filename, line_number) {} + }; + + struct TableMetadata { + size_t file_offset; + size_t uncompressed_size; + }; + + /** + * Reads table metadata from the provided compression stream. Must be invoked before reading + * tables. + */ + void read_metadata(ZstdDecompressor& decompressor); + + /** + * Opens a file reader for the tables section. Must be invoked before reading tables. + */ + void open_tables(std::string const& tables_file_path); + + /** + * Closes the file reader for the tables section. + */ + void close(); + + /** + * Decompresses a table with a given table_id and returns it. This function must be called + * strictly in ascending table_id order. If this function is called twice for the same table or + * if a table with lower id is requested after a table with higher id then an error is thrown. + * + * Note: the buffer and buffer size are returned by reference. This is to support the use case + * where the caller wants to re-use the same buffer for multiple tables to avoid allocations + * when they already have a sufficiently large buffer. If no buffer is provided or the provided + * buffer is too small calling read_table will create a buffer exactly as large as the table + * being decompressed. + * + * @param table_id + * @param buf + * @param buf_size + * @return a shared_ptr to a buffer containing the requested table + */ + void read_table(size_t table_id, std::shared_ptr& buf, size_t& buf_size); + + size_t get_uncompressed_table_size(size_t table_id) const { + return m_table_metadata.at(table_id).uncompressed_size; + } + +private: + enum TableReaderState { + Uninitialized, + MetadataRead, + TablesOpened, + TablesOpenedAndMetadataRead, + ReadingTables + }; + + std::vector m_table_metadata; + FileReader m_tables_reader; + ZstdDecompressor m_tables_decompressor; + TableReaderState m_state{TableReaderState::Uninitialized}; + size_t m_previous_table_id{0ULL}; +}; + +} // namespace clp_s + +#endif // CLP_S_TABLEREADER_HPP diff --git a/components/core/src/clp_s/search/Output.cpp b/components/core/src/clp_s/search/Output.cpp index b6a3b8fe0..4d36b4e29 100644 --- a/components/core/src/clp_s/search/Output.cpp +++ b/components/core/src/clp_s/search/Output.cpp @@ -84,7 +84,7 @@ bool Output::filter() { add_wildcard_columns_to_searched_columns(); - auto& reader = m_archive_reader->read_table( + auto& reader = m_archive_reader->read_schema_table( schema_id, m_output_handler->should_output_metadata(), m_should_marshal_records