From 3117ec02821c5b04a403dc2b63e76d46dddfec4c Mon Sep 17 00:00:00 2001 From: Yukim1 <121286183+zwhzzz0821@users.noreply.github.com> Date: Mon, 25 Nov 2024 10:02:02 +0800 Subject: [PATCH] fix flush bug and fix empty file bug (#310) * fix flush bug and fix empty file bug * add ut --- cpp/src/common/tsfile_common.cc | 3 - cpp/src/file/tsfile_io_writer.cc | 4 +- cpp/src/reader/bloom_filter.cc | 6 +- cpp/src/writer/tsfile_writer.cc | 26 ++++-- cpp/src/writer/tsfile_writer.h | 6 +- cpp/src/writer/value_chunk_writer.cc | 6 ++ cpp/src/writer/value_chunk_writer.h | 2 + cpp/test/common/tsfile_common_test.cc | 2 +- cpp/test/writer/tsfile_writer_test.cc | 124 +++++++++++++++++++++++++- 9 files changed, 161 insertions(+), 18 deletions(-) diff --git a/cpp/src/common/tsfile_common.cc b/cpp/src/common/tsfile_common.cc index a1643439a..ba2c24236 100644 --- a/cpp/src/common/tsfile_common.cc +++ b/cpp/src/common/tsfile_common.cc @@ -92,9 +92,6 @@ int TSMIterator::init() { // FIXME empty list chunk_group_meta_iter_ = chunk_group_meta_list_.begin(); - if (chunk_group_meta_iter_ == chunk_group_meta_list_.end()) { - return E_NOT_EXIST; - } while (chunk_group_meta_iter_ != chunk_group_meta_list_.end()) { chunk_meta_iter_ = chunk_group_meta_iter_.get()->chunk_meta_list_.begin(); diff --git a/cpp/src/file/tsfile_io_writer.cc b/cpp/src/file/tsfile_io_writer.cc index abd08b821..99c6ec64f 100644 --- a/cpp/src/file/tsfile_io_writer.cc +++ b/cpp/src/file/tsfile_io_writer.cc @@ -392,8 +392,8 @@ int TsFileIOWriter::write_file_index() { ret = E_OK; } ASSERT(ret == E_OK); - - if (IS_SUCC(ret)) { // iter finish + if (IS_SUCC(ret) && cur_index_node != nullptr && + cur_index_node_queue != nullptr) { // iter finish ASSERT(cur_index_node != nullptr); ASSERT(cur_index_node_queue != nullptr); if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node, diff --git a/cpp/src/reader/bloom_filter.cc b/cpp/src/reader/bloom_filter.cc index d2afd6644..1ff1109dc 100644 --- a/cpp/src/reader/bloom_filter.cc +++ b/cpp/src/reader/bloom_filter.cc @@ -223,8 +223,6 @@ int BloomFilter::serialize_to(ByteStream &out) { uint8_t *filter_data_bytes = nullptr; int32_t filter_data_bytes_len = 0; bitset_.to_bytes(filter_data_bytes, filter_data_bytes_len); - ASSERT(filter_data_bytes_len > 0); - if (RET_FAIL( SerializationUtil::write_var_uint(filter_data_bytes_len, out))) { } else if (RET_FAIL( @@ -233,7 +231,9 @@ int BloomFilter::serialize_to(ByteStream &out) { } else if (RET_FAIL( SerializationUtil::write_var_uint(hash_func_count_, out))) { } - bitset_.revert_bytes(filter_data_bytes); + if (filter_data_bytes_len > 0) { + bitset_.revert_bytes(filter_data_bytes); + } return ret; } diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index f48ddfd75..8cc0b6d4d 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -747,7 +747,12 @@ int TsFileWriter::flush() { return ret; } } + if (check_chunk_group_empty(device_iter->second, + device_iter->second->is_aligned_)) { + continue; + } bool is_aligned = device_iter->second->is_aligned_; + if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first, is_aligned))) { } else if (RET_FAIL( @@ -759,17 +764,24 @@ int TsFileWriter::flush() { return ret; } -bool TsFileWriter::check_chunk_group_empty( - MeasurementSchemaGroup *chunk_group) { +bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group, + bool is_aligned) { MeasurementSchemaMap &map = chunk_group->measurement_schema_map_; for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end(); ms_iter++) { MeasurementSchema *m_schema = ms_iter->second; - if (m_schema->chunk_writer_ != NULL && - m_schema->chunk_writer_->hasData()) { - // first condition is to avoid first flush empty chunk group - // second condition is to avoid repeated flush - return false; + if (is_aligned) { + if (m_schema->value_chunk_writer_ != NULL && + m_schema->value_chunk_writer_->hasData()) { + return false; + } + } else { + if (m_schema->chunk_writer_ != NULL && + m_schema->chunk_writer_->hasData()) { + // first condition is to avoid first flush empty chunk group + // second condition is to avoid repeated flush + return false; + } } } return true; diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 128715a47..7b43b850f 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -69,6 +69,9 @@ class TsFileWriter { int write_tablet(const Tablet &tablet); int write_record_aligned(const TsRecord &record); int write_tablet_aligned(const Tablet &tablet); + std::map *get_schema_group_map() { + return &schemas_; + } int64_t calculate_mem_size_for_all_group(); int check_memory_size_and_may_flush_chunks(); /* @@ -86,7 +89,8 @@ class TsFileWriter { private: int write_point(storage::ChunkWriter *chunk_writer, int64_t timestamp, const DataPoint &point); - bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group); + bool check_chunk_group_empty(MeasurementSchemaGroup *chunk_group, + bool is_aligned); int write_point_aligned(ValueChunkWriter *value_chunk_writer, int64_t timestamp, const DataPoint &point); int flush_chunk_group(MeasurementSchemaGroup *chunk_group, bool is_aligned); diff --git a/cpp/src/writer/value_chunk_writer.cc b/cpp/src/writer/value_chunk_writer.cc index b32b2dabf..4156eaff9 100644 --- a/cpp/src/writer/value_chunk_writer.cc +++ b/cpp/src/writer/value_chunk_writer.cc @@ -168,4 +168,10 @@ int64_t ValueChunkWriter::estimate_max_series_mem_size() { value_page_writer_.get_statistic()->get_type()); } +bool ValueChunkWriter::hasData() { + return num_of_pages_ > 0 || + (value_page_writer_.get_statistic() != nullptr && + value_page_writer_.get_statistic()->count_ > 0); +} + } // end namespace storage diff --git a/cpp/src/writer/value_chunk_writer.h b/cpp/src/writer/value_chunk_writer.h index 3093dfffc..10f51c758 100644 --- a/cpp/src/writer/value_chunk_writer.h +++ b/cpp/src/writer/value_chunk_writer.h @@ -87,6 +87,8 @@ class ValueChunkWriter { int64_t estimate_max_series_mem_size(); + bool hasData(); + private: FORCE_INLINE bool is_cur_page_full() const { // FIXME diff --git a/cpp/test/common/tsfile_common_test.cc b/cpp/test/common/tsfile_common_test.cc index 08673458f..2c9d14038 100644 --- a/cpp/test/common/tsfile_common_test.cc +++ b/cpp/test/common/tsfile_common_test.cc @@ -203,7 +203,7 @@ TEST_F(TSMIteratorTest, InitEmptyList) { common::PageArena arena; common::SimpleList empty_list(&arena); TSMIterator iter(empty_list); - ASSERT_EQ(iter.init(), common::E_NOT_EXIST); + ASSERT_EQ(iter.init(), common::E_OK); } TEST_F(TSMIteratorTest, HasNext) { diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index d69a9ff19..6b6011265 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -30,7 +30,7 @@ #include "file/write_file.h" #include "reader/qds_without_timegenerator.h" #include "reader/tsfile_reader.h" - +#include "writer/chunk_writer.h" using namespace storage; using namespace common; @@ -381,6 +381,128 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) { ASSERT_EQ(tsfile_writer_->close(), E_OK); } + +TEST_F(TsFileWriterTest, FlushMultipleDevice) { + const int device_num = 50; + const int measurement_num = 50; + const int max_rows = 100; + std::vector schema_vec[50]; + + for (int i = 0; i < device_num; i++) { + std::string device_name = "test_device" + std::to_string(i); + for (int j = 0; j < measurement_num; j++) { + std::string measure_name = "measurement" + std::to_string(j); + schema_vec[i].push_back( + MeasurementSchema(measure_name, common::TSDataType::INT64, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); + tsfile_writer_->register_timeseries( + device_name, measure_name, common::TSDataType::INT64, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED); + } + } + + for (int i = 0; i < device_num; i++) { + std::string device_name = "test_device" + std::to_string(i); + Tablet tablet(device_name, &schema_vec[i], max_rows); + tablet.init(); + for (int j = 0; j < measurement_num; j++) { + for (int row = 0; row < max_rows; row++) { + tablet.set_timestamp(row, 16225600 + row); + } + for (int row = 0; row < max_rows; row++) { + tablet.set_value(row, j, static_cast(row)); + } + } + ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); + // flush after write tablet to check whether write empty chunk + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + } + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + std::vector select_list; + for (int i = 0; i < device_num; i++) { + std::string device_name = "test_device" + std::to_string(i); + for (int j = 0; j < measurement_num; j++) { + std::string measurement_name = "measurement" + std::to_string(j); + storage::Path path(device_name, measurement_name); + select_list.push_back(path); + } + } + storage::QueryExpression *query_expr = + storage::QueryExpression::create(select_list, nullptr); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + storage::QueryDataSet *tmp_qds = nullptr; + + ret = reader.query(query_expr, tmp_qds); + auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; + + storage::RowRecord *record; + int64_t cur_record_num = 0; + do { + record = qds->get_next(); + // if empty chunk is writen, the timestamp should be NULL + if (!record) { + break; + } + EXPECT_EQ(record->get_timestamp(), 16225600 + cur_record_num); + cur_record_num++; + } while (true); + EXPECT_EQ(cur_record_num, max_rows); + storage::QueryExpression::destory(query_expr); + reader.destroy_query_data_set(qds); +} + +TEST_F(TsFileWriterTest, AnalyzeTsfileForload) { + const int device_num = 50; + const int measurement_num = 50; + const int max_rows = 100; + std::vector schema_vec[50]; + + for (int i = 0; i < device_num; i++) { + std::string device_name = "test_device" + std::to_string(i); + for (int j = 0; j < measurement_num; j++) { + std::string measure_name = "measurement" + std::to_string(j); + schema_vec[i].push_back( + MeasurementSchema(measure_name, common::TSDataType::INT64, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); + tsfile_writer_->register_timeseries( + device_name, measure_name, common::TSDataType::INT64, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED); + } + } + + for (int i = 0; i < device_num; i++) { + std::string device_name = "test_device" + std::to_string(i); + Tablet tablet(device_name, &schema_vec[i], max_rows); + tablet.init(); + for (int j = 0; j < measurement_num; j++) { + for (int row = 0; row < max_rows; row++) { + tablet.set_timestamp(row, 16225600 + row); + } + for (int row = 0; row < max_rows; row++) { + tablet.set_value(row, j, static_cast(row)); + } + } + ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); + } + auto schemas = tsfile_writer_->get_schema_group_map(); + ASSERT_EQ(schemas->size(), 50); + for (const auto& device_iter : *schemas) { + for (const auto& chunk_iter : device_iter.second->measurement_schema_map_) { + ASSERT_NE(chunk_iter.second->chunk_writer_, nullptr); + ASSERT_TRUE(chunk_iter.second->chunk_writer_->hasData()); + } + } + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); +} TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) { std::string device_path = "device1"; std::string measurement_name = "temperature";