diff --git a/cpp/examples/cpp_examples/demo_read.cpp b/cpp/examples/cpp_examples/demo_read.cpp index 5b36af3c8..8fab33f99 100644 --- a/cpp/examples/cpp_examples/demo_read.cpp +++ b/cpp/examples/cpp_examples/demo_read.cpp @@ -70,16 +70,16 @@ int demo_read() { std::cout << "begin to query expr" << std::endl; ASSERT(ret == 0); - storage::QueryDataSet *qds = nullptr; + storage::ResultSet *qds = nullptr; ret = reader.query(query_expr, qds); storage::RowRecord *record; std::cout << "begin to dump data from tsfile ---" << std::endl; int row_cout = 0; do { - record = qds->get_next(); - if (record) { + if (qds->next()) { std::cout << "dump QDS : " << record->get_timestamp() << ","; + record = qds->get_row_record(); if (record) { int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { diff --git a/cpp/examples/cpp_examples/demo_write.cpp b/cpp/examples/cpp_examples/demo_write.cpp index 2fafb4655..17c5cb6ba 100644 --- a/cpp/examples/cpp_examples/demo_write.cpp +++ b/cpp/examples/cpp_examples/demo_write.cpp @@ -66,16 +66,14 @@ int demo_write() { std::vector schema_vec[50]; for (int i = 0; i < device_num; i++) { std::string device_name = "test_device" + std::to_string(i); + schema_vec[i].reserve(measurement_num); for (int j = 0; j < measurement_num; j++) { std::string measure_name = "measurement" + std::to_string(j); - schema_vec[i].push_back( + schema_vec[i].emplace_back( MeasurementSchema(measure_name, common::TSDataType::INT32, common::TSEncoding::PLAIN, common::CompressionType::UNCOMPRESSED)); - tsfile_writer_->register_timeseries( - device_name, measure_name, common::TSDataType::INT32, - common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); + tsfile_writer_->register_timeseries(device_name, schema_vec[i][j]); } } diff --git a/cpp/src/common/allocator/my_string.h b/cpp/src/common/allocator/my_string.h index 72bbce3f9..e543d6b0a 100644 --- a/cpp/src/common/allocator/my_string.h +++ b/cpp/src/common/allocator/my_string.h @@ -147,6 +147,7 @@ struct String { return this->len_ < other.len_; } + std::string to_std_string() { return std::string(buf_, len_); } #ifndef NDEBUG friend std::ostream &operator<<(std::ostream &os, const String &s) { diff --git a/cpp/src/common/record.h b/cpp/src/common/record.h index 1fc275d46..1e1374634 100644 --- a/cpp/src/common/record.h +++ b/cpp/src/common/record.h @@ -24,7 +24,7 @@ #include #include "common/db_common.h" - +#include "utils/errno_define.h" namespace storage { // TODO: use std::move @@ -120,22 +120,24 @@ struct DataPoint { struct TsRecord { int64_t timestamp_; - std::string device_name_; + std::string device_id_; std::vector points_; - TsRecord(const std::string &device_name) : device_name_(device_name) {} + TsRecord(const std::string &device_name) : device_id_(device_name) {} TsRecord(int64_t timestamp, const std::string &device_name, int32_t point_count_in_row = 0) - : timestamp_(timestamp), device_name_(device_name), points_() { + : timestamp_(timestamp), device_id_(device_name), points_() { if (point_count_in_row > 0) { points_.reserve(point_count_in_row); } } - void append_data_point(const DataPoint &point) { - // points_.emplace_back(point); C++11 - points_.push_back(point); + template + int add_point(const std::string &measurement_name, T val) { + int ret = common::E_OK; + points_.emplace_back(DataPoint(measurement_name, val)); + return ret; } }; diff --git a/cpp/src/common/row_record.h b/cpp/src/common/row_record.h index 6901b08d7..83a4ff149 100644 --- a/cpp/src/common/row_record.h +++ b/cpp/src/common/row_record.h @@ -86,9 +86,31 @@ struct Field { } } + template + FORCE_INLINE T get_value() { + switch (type_) { + case common::TSDataType::BOOLEAN: + return value_.bval_; + case common::TSDataType::INT32: + return value_.ival_; + case common::TSDataType::INT64: + return value_.lval_; + case common::TSDataType::FLOAT: + return value_.fval_; + case common::TSDataType::DOUBLE: + return value_.dval_; + // case common::TSDataType::TEXT : + // return value_.sval_; + default: + std::cout << "unknown data type" << std::endl; + break; + } + return -1; // when data type is unknown + } + public: common::TSDataType type_; - + std::string column_name; union { bool bval_; int64_t lval_; @@ -182,6 +204,8 @@ class RowRecord { FORCE_INLINE std::vector *get_fields() { return fields_; } + FORCE_INLINE uint32_t get_col_num() { return col_num_; } + private: int64_t time_; // time value uint32_t col_num_; // measurement num diff --git a/cpp/src/common/schema.h b/cpp/src/common/schema.h index 087439dd0..f4710698f 100644 --- a/cpp/src/common/schema.h +++ b/cpp/src/common/schema.h @@ -50,6 +50,15 @@ struct MeasurementSchema { chunk_writer_(nullptr), value_chunk_writer_(nullptr) {} + MeasurementSchema(const std::string &measurement_name, + common::TSDataType data_type) + : measurement_name_(measurement_name), + data_type_(data_type), + encoding_(get_default_encoding_for_type(data_type)), + compression_type_(common::LZ4), + chunk_writer_(nullptr), + value_chunk_writer_(nullptr) {} + MeasurementSchema(const std::string &measurement_name, common::TSDataType data_type, common::TSEncoding encoding, common::CompressionType compression_type) diff --git a/cpp/src/common/tablet.cc b/cpp/src/common/tablet.cc index 1ed92cf4c..90d1227ae 100644 --- a/cpp/src/common/tablet.cc +++ b/cpp/src/common/tablet.cc @@ -29,7 +29,7 @@ namespace storage { int Tablet::init() { ASSERT(timestamps_ == NULL); - timestamps_ = (int64_t *)malloc(sizeof(int64_t) * max_rows_); + timestamps_ = (int64_t *)malloc(sizeof(int64_t) * max_row_num_); size_t schema_count = schema_vec_->size(); std::pair::iterator, bool> ins_res; @@ -48,12 +48,12 @@ int Tablet::init() { for (size_t c = 0; c < schema_count; c++) { const MeasurementSchema &schema = schema_vec_->at(c); value_matrix_[c] = - malloc(get_data_type_size(schema.data_type_) * max_rows_); + malloc(get_data_type_size(schema.data_type_) * max_row_num_); } bitmaps_ = new BitMap[schema_count]; for (size_t c = 0; c < schema_count; c++) { - bitmaps_[c].init(max_rows_, /*init_as_zero=*/true); + bitmaps_[c].init(max_row_num_, /*init_as_zero=*/true); } return E_OK; } @@ -75,9 +75,9 @@ void Tablet::destroy() { } } -int Tablet::set_timestamp(int row_index, int64_t timestamp) { +int Tablet::add_timestamp(uint32_t row_index, int64_t timestamp) { ASSERT(timestamps_ != NULL); - if (UNLIKELY(row_index >= max_rows_)) { + if (UNLIKELY(row_index >= max_row_num_)) { ASSERT(false); return E_OUT_OF_RANGE; } @@ -85,80 +85,60 @@ int Tablet::set_timestamp(int row_index, int64_t timestamp) { return E_OK; } -#define DO_SET_VALUE_BY_COL_NAME(row_index, measurement_name, val) \ - do { \ - SchemaMapIterator find_iter = schema_map_.find(measurement_name); \ - if (LIKELY(find_iter == schema_map_.end())) { \ - ASSERT(false); \ - return E_INVALID_ARG; \ - } \ - return set_value(row_index, find_iter->second, val); \ - } while (false) - -#define DO_SET_VALUE_BY_COL_INDEX(row_index, schema_index, CppType, val) \ - do { \ - if (LIKELY(schema_index >= schema_vec_->size())) { \ - ASSERT(false); \ - return E_OUT_OF_RANGE; \ - } \ - const MeasurementSchema &schema = schema_vec_->at(schema_index); \ - if (LIKELY(GetDataTypeFromTemplateType() != \ - schema.data_type_)) { \ - return E_TYPE_NOT_MATCH; \ - } \ - CppType *column_values = (CppType *)value_matrix_[schema_index]; \ - column_values[row_index] = val; \ - bitmaps_[schema_index].set(row_index); /* mark as non-null*/ \ - } while (false) - -int Tablet::set_value(int row_index, const std::string &measurement_name, - bool val) { - DO_SET_VALUE_BY_COL_NAME(row_index, measurement_name, val); -} - -int Tablet::set_value(int row_index, const std::string &measurement_name, - int32_t val) { - DO_SET_VALUE_BY_COL_NAME(row_index, measurement_name, val); -} - -int Tablet::set_value(int row_index, const std::string &measurement_name, - int64_t val) { - DO_SET_VALUE_BY_COL_NAME(row_index, measurement_name, val); -} - -int Tablet::set_value(int row_index, const std::string &measurement_name, - float val) { - DO_SET_VALUE_BY_COL_NAME(row_index, measurement_name, val); -} - -int Tablet::set_value(int row_index, const std::string &measurement_name, - double val) { - DO_SET_VALUE_BY_COL_NAME(row_index, measurement_name, val); -} - -int Tablet::set_value(int row_index, uint32_t schema_index, bool val) { - DO_SET_VALUE_BY_COL_INDEX(row_index, schema_index, bool, val); - return E_OK; -} - -int Tablet::set_value(int row_index, uint32_t schema_index, int32_t val) { - DO_SET_VALUE_BY_COL_INDEX(row_index, schema_index, int32_t, val); - return E_OK; -} - -int Tablet::set_value(int row_index, uint32_t schema_index, int64_t val) { - DO_SET_VALUE_BY_COL_INDEX(row_index, schema_index, int64_t, val); - return E_OK; -} - -int Tablet::set_value(int row_index, uint32_t schema_index, float val) { - DO_SET_VALUE_BY_COL_INDEX(row_index, schema_index, float, val); - return E_OK; +template +int Tablet::add_value(uint32_t row_index, uint32_t schema_index, T val) { + int ret = common::E_OK; + if (LIKELY(schema_index >= schema_vec_->size())) { + ASSERT(false); + ret = common::E_OUT_OF_RANGE; + } else { + const MeasurementSchema &schema = schema_vec_->at(schema_index); + if (LIKELY(GetDataTypeFromTemplateType() != schema.data_type_)) { + ret = common::E_TYPE_NOT_MATCH; + } else { + T *column_values = (T *)value_matrix_[schema_index]; + column_values[row_index] = val; + bitmaps_[schema_index].set(row_index); /* mark as non-null*/ + } + } + return ret; } -int Tablet::set_value(int row_index, uint32_t schema_index, double val) { - DO_SET_VALUE_BY_COL_INDEX(row_index, schema_index, double, val); - return E_OK; +template +int Tablet::add_value(uint32_t row_index, const std::string &measurement_name, + T val) { + int ret = common::E_OK; + SchemaMapIterator find_iter = schema_map_.find(measurement_name); + if (LIKELY(find_iter == schema_map_.end())) { + ASSERT(false); + ret = E_INVALID_ARG; + } else { + ret = add_value(row_index, find_iter->second, val); + } + return ret; } +template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, + bool val); +template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, + int32_t val); +template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, + int64_t val); +template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, + float val); +template int Tablet::add_value(uint32_t row_index, uint32_t schema_index, + double val); + +template int Tablet::add_value(uint32_t row_index, + const std::string &measurement_name, bool val); +template int Tablet::add_value(uint32_t row_index, + const std::string &measurement_name, + int32_t val); +template int Tablet::add_value(uint32_t row_index, + const std::string &measurement_name, + int64_t val); +template int Tablet::add_value(uint32_t row_index, + const std::string &measurement_name, float val); +template int Tablet::add_value(uint32_t row_index, + const std::string &measurement_name, double val); } // end namespace storage \ No newline at end of file diff --git a/cpp/src/common/tablet.h b/cpp/src/common/tablet.h index 163f2ee87..748045dca 100644 --- a/cpp/src/common/tablet.h +++ b/cpp/src/common/tablet.h @@ -20,6 +20,8 @@ #ifndef COMMON_TABLET_H #define COMMON_TABLET_H +#include +#include #include #include "common/container/bit_map.h" @@ -27,57 +29,76 @@ namespace storage { +template + class TabletRowIterator; class TabletColIterator; class Tablet { public: - static const int DEFAULT_MAX_ROWS = 1024; + static const uint32_t DEFAULT_MAX_ROWS = 1024; public: - Tablet(const std::string &device_name, - const std::vector *schema_vec, - int max_rows = DEFAULT_MAX_ROWS) - : max_rows_(max_rows), - device_name_(device_name), + Tablet(const std::string &device_id, + std::shared_ptr> schema_vec, + uint32_t max_rows = DEFAULT_MAX_ROWS) + : max_row_num_(max_rows), + device_id_(device_id), schema_vec_(schema_vec), timestamps_(NULL), value_matrix_(NULL), bitmaps_(NULL) { - ASSERT(device_name.size() >= 1); + ASSERT(device_id.size() >= 1); ASSERT(schema_vec != NULL); ASSERT(max_rows > 0 && max_rows < (1 << 30)); if (max_rows < 0) { ASSERT(false); - max_rows_ = DEFAULT_MAX_ROWS; + max_row_num_ = DEFAULT_MAX_ROWS; } } + + Tablet(const std::string &device_id, + const std::vector *measurement_list, + const std::vector *data_type_list, + uint32_t max_row_num = DEFAULT_MAX_ROWS) + : max_row_num_(max_row_num), + device_id_(device_id), + timestamps_(NULL), + value_matrix_(NULL), + bitmaps_(NULL) { + ASSERT(device_id.size() >= 1); + ASSERT(measurement_list != NULL); + ASSERT(data_type_list != NULL); + ASSERT(max_row_num > 0 && max_row_num < (1 << 30)); + if (max_row_num < 0) { + ASSERT(false); + max_row_num_ = DEFAULT_MAX_ROWS; + } + + ASSERT(measurement_list->size() == data_type_list->size()); + std::vector measurement_vec; + measurement_vec.reserve(measurement_list->size()); + std::transform(measurement_list->begin(), measurement_list->end(), + data_type_list->begin(), + std::back_inserter(measurement_vec), + [](const std::string &name, common::TSDataType type) { + return MeasurementSchema(name, type); + }); + } ~Tablet() { destroy(); } int init(); void destroy(); size_t get_column_count() const { return schema_vec_->size(); } - int set_timestamp(int row_index, int64_t timestamp); - - int set_value(int row_index, uint32_t schema_index, bool val); - int set_value(int row_index, uint32_t schema_index, int32_t val); - int set_value(int row_index, uint32_t schema_index, int64_t val); - int set_value(int row_index, uint32_t schema_index, float val); - int set_value(int row_index, uint32_t schema_index, double val); - // int set_value(int row_index, int schema_index, double val); - - int set_value(int row_index, const std::string &measurement_name, bool val); - int set_value(int row_index, const std::string &measurement_name, - int32_t val); - int set_value(int row_index, const std::string &measurement_name, - int64_t val); - int set_value(int row_index, const std::string &measurement_name, - float val); - int set_value(int row_index, const std::string &measurement_name, - double val); - // int set_value(int row_index, const std::string &measurement_name, double - // val); + int add_timestamp(uint32_t row_index, int64_t timestamp); + + template + int add_value(uint32_t row_index, uint32_t schema_index, T val); + + template + int add_value(uint32_t row_index, const std::string &measurement_name, + T val); friend class TabletColIterator; friend class TsFileWriter; @@ -87,9 +108,9 @@ class Tablet { typedef std::map::iterator SchemaMapIterator; private: - int max_rows_; - std::string device_name_; - const std::vector *schema_vec_; + uint32_t max_row_num_; + std::string device_id_; + std::shared_ptr> schema_vec_; std::map schema_map_; int64_t *timestamps_; void **value_matrix_; diff --git a/cpp/src/cwrapper/TsFile-cwrapper.cc b/cpp/src/cwrapper/TsFile-cwrapper.cc index d19c47694..60efca08f 100644 --- a/cpp/src/cwrapper/TsFile-cwrapper.cc +++ b/cpp/src/cwrapper/TsFile-cwrapper.cc @@ -27,7 +27,7 @@ #include "reader/filter/filter.h" #include "reader/filter/time_filter.h" #include "reader/filter/time_operator.h" -#include "reader/query_data_set.h" +#include "reader/result_set.h" #include "reader/tsfile_reader.h" #include "utils/errno_define.h" #include "writer/tsfile_writer.h" @@ -206,10 +206,12 @@ ErrorCode tsfile_register_table_column(CTsFileWriter writer, const char* table_name, ColumnSchema* schema) { TsFileWriter* w = (TsFileWriter*)writer; - int ret = w->register_timeseries(table_name, schema->name, - get_datatype(schema->column_def), - get_data_encoding(schema->column_def), - get_data_compression(schema->column_def)); + + int ret = w->register_timeseries( + table_name, storage::MeasurementSchema( + schema->name, get_datatype(schema->column_def), + get_data_encoding(schema->column_def), + get_data_compression(schema->column_def))); return ret; } @@ -218,11 +220,12 @@ ErrorCode tsfile_register_table(CTsFileWriter writer, TsFileWriter* w = (TsFileWriter*)writer; for (int column_id = 0; column_id < table_schema->column_num; column_id++) { ColumnSchema* schema = table_schema->column_schema[column_id]; - ErrorCode ret = - w->register_timeseries(table_schema->table_name, schema->name, - get_datatype(schema->column_def), - get_data_encoding(schema->column_def), - get_data_compression(schema->column_def)); + ErrorCode ret = w->register_timeseries( + table_schema->table_name, + storage::MeasurementSchema( + schema->name, get_datatype(schema->column_def), + get_data_encoding(schema->column_def), + get_data_compression(schema->column_def))); if (ret != E_OK) { return ret; } @@ -593,7 +596,7 @@ QueryDataRet ts_reader_query(CTsFileReader reader, const char* table_name, selected_paths.push_back(storage::Path(table_name_str, column_name)); } - storage::QueryDataSet* qds = nullptr; + storage::ResultSet* qds = nullptr; storage::QueryExpression* query_expression = storage::QueryExpression::create(selected_paths, (storage::Expression*)expression); @@ -605,7 +608,6 @@ QueryDataRet ts_reader_query(CTsFileReader reader, const char* table_name, for (int i = 0; i < column_num; i++) { ret->column_names[i] = strdup(columns_name[i]); } - storage::QueryExpression::destory(query_expression); return ret; } @@ -620,7 +622,7 @@ QueryDataRet ts_reader_begin_end(CTsFileReader reader, const char* table_name, selected_paths.push_back(storage::Path(table_name_str, column_name)); } - storage::QueryDataSet* qds = nullptr; + storage::ResultSet* qds = nullptr; storage::Filter* filter_low = nullptr; storage::Filter* filter_high = nullptr; storage::Expression* exp = nullptr; @@ -633,7 +635,8 @@ QueryDataRet ts_reader_begin_end(CTsFileReader reader, const char* table_name, } if (filter_low != nullptr && filter_high != nullptr) { and_filter = new storage::AndFilter(filter_low, filter_high); - exp = new storage::Expression(storage::GLOBALTIME_EXPR, and_filter); + exp = new storage::Expression(storage::GLOBALTIME_EXPR, + and_filter); // exp never be deleted } else if (filter_low != nullptr && filter_high == nullptr) { exp = new storage::Expression(storage::GLOBALTIME_EXPR, filter_low); } else if (filter_high != nullptr && filter_low == nullptr) { @@ -649,7 +652,6 @@ QueryDataRet ts_reader_begin_end(CTsFileReader reader, const char* table_name, for (int i = 0; i < column_num; i++) { ret->column_names[i] = strdup(columns_name[i]); } - storage::QueryExpression::destory(query_expr); return ret; } @@ -662,7 +664,7 @@ QueryDataRet ts_reader_read(CTsFileReader reader, const char* table_name, std::string column_name(columns_name[i]); selected_paths.push_back(storage::Path(table_name_str, column_name)); } - storage::QueryDataSet* qds = nullptr; + storage::ResultSet* qds = nullptr; storage::QueryExpression* query_expr = storage::QueryExpression::create(selected_paths, nullptr); r->query(query_expr, qds); @@ -673,12 +675,11 @@ QueryDataRet ts_reader_read(CTsFileReader reader, const char* table_name, for (int i = 0; i < column_num; i++) { ret->column_names[i] = strdup(columns_name[i]); } - storage::QueryExpression::destory(query_expr); return ret; } ErrorCode destory_query_dataret(QueryDataRet data) { - storage::QueryDataSet* qds = (storage::QueryDataSet*)data->data; + storage::ResultSet* qds = (storage::ResultSet*)data->data; delete qds; for (int i = 0; i < data->column_num; i++) { free(data->column_names[i]); @@ -689,17 +690,17 @@ ErrorCode destory_query_dataret(QueryDataRet data) { } DataResult* ts_next(QueryDataRet data, int expect_line_count) { - storage::QueryDataSet* qds = (storage::QueryDataSet*)data->data; + storage::ResultSet* qds = (storage::ResultSet*)data->data; DataResult* result = create_tablet("result", expect_line_count); storage::RowRecord* record; bool init_tablet = false; for (int i = 0; i < expect_line_count; i++) { - record = qds->get_next(); - if (record == nullptr) { + if (!qds->next()) { break; - std::cout << "record null now" + std::cout << "no more record now" << "i = " << i << std::endl; } + record = qds->get_row_record(); int column_num = record->get_fields()->size(); if (!init_tablet) { for (int col = 0; col < column_num; col++) { diff --git a/cpp/src/encoding/encoder_factory.h b/cpp/src/encoding/encoder_factory.h index e9e02e4b2..0e582ae3b 100644 --- a/cpp/src/encoding/encoder_factory.h +++ b/cpp/src/encoding/encoder_factory.h @@ -55,6 +55,18 @@ class EncoderFactory { } } + static Encoder *alloc_time_encoder(common::TSEncoding encoding) { + if (encoding == common::PLAIN) { + ALLOC_AND_RETURN_ENCODER(PlainEncoder); + } else if (encoding == common::TS_2DIFF) { + ALLOC_AND_RETURN_ENCODER(LongTS2DIFFEncoder); + } else { + // not support now + ASSERT(false); + return nullptr; + } + } + static Encoder *alloc_value_encoder(common::TSEncoding encoding, common::TSDataType data_type) { if (encoding == common::PLAIN) { diff --git a/cpp/src/file/tsfile_io_reader.cc b/cpp/src/file/tsfile_io_reader.cc index cba47f533..218672d95 100644 --- a/cpp/src/file/tsfile_io_reader.cc +++ b/cpp/src/file/tsfile_io_reader.cc @@ -88,6 +88,25 @@ void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) { } } +int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( + std::string device_id, std::vector ×eries_indexs, + PageArena &pa) { + int ret = E_OK; + load_tsfile_meta_if_necessary(); + MetaIndexEntry meta_index_entry; + int64_t end_offset; + std::vector> meta_index_entry_list; + if (RET_FAIL( + load_device_index_entry(device_id, meta_index_entry, end_offset))) { + } else if (RET_FAIL(load_all_measurement_index_entry( + meta_index_entry.offset_, end_offset, pa, + meta_index_entry_list))) { + } else if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa, + timeseries_indexs))) { + } + return ret; +} + bool TsFileIOReader::filter_stasify(ITimeseriesIndex *ts_index, Filter *time_filter) { ASSERT(ts_index->get_statistic() != nullptr); @@ -304,6 +323,47 @@ int TsFileIOReader::load_measurement_index_entry( return ret; } +int TsFileIOReader::load_all_measurement_index_entry( + int64_t start_offset, int64_t end_offset, common::PageArena &pa, + std::vector> + &ret_measurement_index_entry) { +#if DEBUG_SE + std::cout << "load_measurement_index_entry: measurement_name_str=" + << measurement_name_str << ", start_offset=" << start_offset + << ", end_offset=" << end_offset << std::endl; +#endif + ASSERT(start_offset < end_offset); + int ret = E_OK; + // 1. load top measuremnt_index_node + const int32_t read_size = (int32_t)(end_offset - start_offset); + int32_t ret_read_len = 0; + char *data_buf = (char *)pa.alloc(read_size); + void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); + if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { + return E_OOM; + } + MetaIndexNode *top_node = new (m_idx_node_buf) MetaIndexNode(&pa); + if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, + ret_read_len))) { + } else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) { + } +#if DEBUG_SE + std::cout + << "load_measurement_index_entry deserialize MetaIndexNode, top_node=" + << *top_node << " at file pos " << start_offset << " to " << end_offset + << std::endl; +#endif + // 2. search from top_node in top-down way + if (IS_SUCC(ret)) { + get_all_leaf(top_node, ret_measurement_index_entry); + } + if (ret == E_NOT_EXIST) { + ret = E_MEASUREMENT_NOT_EXIST; + } + top_node->children_.~vector(); + return ret; +} + /* * @target_name device_name or measurement_name * @index_node leaf device node or leaf measurement node @@ -457,6 +517,81 @@ int TsFileIOReader::do_load_timeseries_index( return ret; } +int TsFileIOReader::do_load_all_timeseries_index( + std::vector> &index_node_entry_list, + common::PageArena &in_timeseries_index_pa, + std::vector &ts_indexs) { + int ret = E_OK; + for (const auto &index_node_entry : index_node_entry_list) { + int64_t start_offset = index_node_entry.first->offset_, + end_offset = index_node_entry.second; + std::cout << index_node_entry.first->name_ << std::endl; + const std::string target_measurement_name( + index_node_entry.first->name_.to_std_string()); + ITimeseriesIndex *ts_idx; + ret = do_load_timeseries_index(target_measurement_name, start_offset, + end_offset, in_timeseries_index_pa, + ts_idx); + if (IS_SUCC(ret)) { + ts_indexs.push_back(ts_idx); + } + } + return ret; +} + +int TsFileIOReader::get_all_leaf( + MetaIndexNode *index_node, + std::vector> &index_node_entry_list) { + int ret = E_OK; + if (index_node->node_type_ == LEAF_MEASUREMENT || + index_node->node_type_ == LEAF_DEVICE) { + for (size_t i = 0; i < index_node->children_.size(); i++) { + if (i + 1 < index_node->children_.size()) { + index_node_entry_list.push_back( + std::make_pair(index_node->children_[i], + index_node->children_[i + 1]->offset_)); + } else { + index_node_entry_list.push_back(std::make_pair( + index_node->children_[i], index_node->end_offset_)); + } + } + } else { + // reader next level index node + for (size_t i = 0; i < index_node->children_.size(); i++) { + int64_t end_offset = index_node->end_offset_; + if (i + 1 < index_node->children_.size()) { + end_offset = index_node->children_[i + 1]->offset_; + } + const int read_size = + end_offset - index_node->children_[i]->offset_; +#if DEBUG_SE + std::cout << "search_from_internal_node, end_offset=" << end_offset + << ", index_entry.offset_=" << index_entry.offset_ + << std::endl; +#endif + ASSERT(read_size > 0 && read_size < (1 << 30)); + PageArena cur_level_index_node_pa; + void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); + char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); + if (IS_NULL(buf) || IS_NULL(data_buf)) { + return E_OOM; + } + MetaIndexNode *cur_level_index_node = + new (buf) MetaIndexNode(&cur_level_index_node_pa); + int32_t ret_read_len = 0; + if (RET_FAIL(read_file_->read(index_node->children_[i]->offset_, + data_buf, read_size, ret_read_len))) { + } else if (read_size != ret_read_len) { + ret = E_TSFILE_CORRUPTED; + } else if (RET_FAIL(cur_level_index_node->deserialize_from( + data_buf, read_size))) { + } else { + ret = get_all_leaf(cur_level_index_node, index_node_entry_list); + } + } + } + return ret; +} #if 0 int TsFileIOReader::get_next(const std::string &device_path, const std::string &measurement_name, diff --git a/cpp/src/file/tsfile_io_reader.h b/cpp/src/file/tsfile_io_reader.h index 8c26e547b..a8d8419bd 100644 --- a/cpp/src/file/tsfile_io_reader.h +++ b/cpp/src/file/tsfile_io_reader.h @@ -59,6 +59,15 @@ class TsFileIOReader { Filter *time_filter = nullptr); void revert_ssi(TsFileSeriesScanIterator *ssi); std::string get_file_path() const { return read_file_->file_path(); } + TsFileMeta *get_tsfile_meta() { + load_tsfile_meta_if_necessary(); + return &tsfile_meta_; + } + + int get_device_timeseries_meta_without_chunk_meta( + std::string device_id, + std::vector ×eries_indexs, + common::PageArena &pa); private: FORCE_INLINE int32_t file_size() const { return read_file_->file_size(); } @@ -71,10 +80,19 @@ class TsFileIOReader { const std::string &measurement_name, int64_t start_offset, int64_t end_offset, MetaIndexEntry &ret_measurement_index_entry, int64_t &ret_end_offset); + int load_all_measurement_index_entry( + int64_t start_offset, int64_t end_offset, common::PageArena &pa, + std::vector> + &ret_measurement_index_entry); int do_load_timeseries_index(const std::string &measurement_name_str, int64_t start_offset, int64_t end_offset, common::PageArena &pa, ITimeseriesIndex *&ts_index); + int do_load_all_timeseries_index( + std::vector> + &index_node_entry_list, + common::PageArena &in_timeseries_index_pa, + std::vector &ts_indexs); int load_timeseries_index_for_ssi(const std::string &device_path, const std::string &measurement_name, TsFileSeriesScanIterator *&ssi); @@ -88,6 +106,10 @@ class TsFileIOReader { int64_t &ret_end_offset); bool filter_stasify(ITimeseriesIndex *ts_index, Filter *time_filter); + int get_all_leaf(MetaIndexNode *index_node, + std::vector> + &index_node_entry_list); + private: ReadFile *read_file_; common::PageArena tsfile_meta_page_arena_; diff --git a/cpp/src/reader/expression.cc b/cpp/src/reader/expression.cc index e6002d829..123132e2b 100644 --- a/cpp/src/reader/expression.cc +++ b/cpp/src/reader/expression.cc @@ -196,12 +196,19 @@ Expression *QueryExpression::optimize(Expression *expression, void QueryExpression::destory() { for (size_t i = 0; i < my_exprs_.size(); i++) { delete my_exprs_[i]; + my_exprs_[i] = nullptr; } my_exprs_.clear(); for (size_t i = 0; i < my_filters_.size(); i++) { delete my_filters_[i]; + my_filters_[i] = nullptr; + } + if (expression_ != nullptr) { + delete expression_; + expression_ = nullptr; } my_filters_.clear(); + } } // namespace storage diff --git a/cpp/src/reader/expression.h b/cpp/src/reader/expression.h index b5e1af084..512705039 100644 --- a/cpp/src/reader/expression.h +++ b/cpp/src/reader/expression.h @@ -24,6 +24,7 @@ #include "common/db_common.h" #include "common/path.h" +#include "filter/filter.h" namespace storage { @@ -64,6 +65,21 @@ struct Expression { right_(nullptr), filter_(f), series_path_(path) {} + void destroy() { + if (filter_ != nullptr) { + delete filter_; + filter_ = nullptr; + } + if (left_ != nullptr) { + delete left_; + left_ = nullptr; + } + if (right_ != nullptr) { + delete right_; + right_ = nullptr; + } + } + ~Expression() { destroy(); } }; class QueryExpression { diff --git a/cpp/src/reader/filter/and_filter.h b/cpp/src/reader/filter/and_filter.h index b902d59cb..839826342 100644 --- a/cpp/src/reader/filter/and_filter.h +++ b/cpp/src/reader/filter/and_filter.h @@ -34,16 +34,16 @@ class AndFilter : public BinaryFilter { return left_->satisfy(statistic) && right_->satisfy(statistic); } - FORCE_INLINE bool satisfy(long time, int64_t value) { + FORCE_INLINE bool satisfy(int64_t time, int64_t value) { return left_->satisfy(time, value) && right_->satisfy(time, value); } - FORCE_INLINE bool satisfy_start_end_time(long start_time, long end_time) { + FORCE_INLINE bool satisfy_start_end_time(int64_t start_time, int64_t end_time) { return left_->satisfy_start_end_time(start_time, end_time) && right_->satisfy_start_end_time(start_time, end_time); } - FORCE_INLINE bool contain_start_end_time(long start_time, long end_time) { + FORCE_INLINE bool contain_start_end_time(int64_t start_time, int64_t end_time) { return left_->contain_start_end_time(start_time, end_time) && right_->contain_start_end_time(start_time, end_time); } diff --git a/cpp/src/reader/filter/filter.h b/cpp/src/reader/filter/filter.h index f4d15f0b7..2179201f0 100644 --- a/cpp/src/reader/filter/filter.h +++ b/cpp/src/reader/filter/filter.h @@ -37,15 +37,15 @@ class Filter { ASSERT(false); return false; } - virtual bool satisfy(long time, int64_t value) { + virtual bool satisfy(int64_t time, int64_t value) { ASSERT(false); return false; } - virtual bool satisfy_start_end_time(long start_time, long end_time) { + virtual bool satisfy_start_end_time(int64_t start_time, int64_t end_time) { ASSERT(false); return false; } - virtual bool contain_start_end_time(long start_time, long end_time) { + virtual bool contain_start_end_time(int64_t start_time, int64_t end_time) { ASSERT(false); return false; } diff --git a/cpp/src/reader/filter/or_filter.h b/cpp/src/reader/filter/or_filter.h index f0ca1aed1..f68d9d02f 100644 --- a/cpp/src/reader/filter/or_filter.h +++ b/cpp/src/reader/filter/or_filter.h @@ -34,16 +34,16 @@ class OrFilter : public BinaryFilter { return left_->satisfy(statistic) || right_->satisfy(statistic); } - FORCE_INLINE bool satisfy(long time, int64_t value) { + FORCE_INLINE bool satisfy(int64_t time, int64_t value) { return left_->satisfy(time, value) || right_->satisfy(time, value); } - FORCE_INLINE bool satisfy_start_end_time(long start_time, long end_time) { + FORCE_INLINE bool satisfy_start_end_time(int64_t start_time, int64_t end_time) { return left_->satisfy_start_end_time(start_time, end_time) || right_->satisfy_start_end_time(start_time, end_time); } - FORCE_INLINE bool contain_start_end_time(long start_time, long end_time) { + FORCE_INLINE bool contain_start_end_time(int64_t start_time, int64_t end_time) { return left_->contain_start_end_time(start_time, end_time) || right_->contain_start_end_time(start_time, end_time); } diff --git a/cpp/src/reader/filter/time_operator.cc b/cpp/src/reader/filter/time_operator.cc index c43376a69..d4b0ed6f3 100644 --- a/cpp/src/reader/filter/time_operator.cc +++ b/cpp/src/reader/filter/time_operator.cc @@ -38,11 +38,11 @@ bool TimeBetween::satisfy(Statistic *statistic) { } } -bool TimeBetween::satisfy(long time, int64_t value) { +bool TimeBetween::satisfy(int64_t time, int64_t value) { return (value1_ <= time) && (time <= value2_) ^ not_; } -bool TimeBetween::satisfy_start_end_time(long start_time, long end_time) { +bool TimeBetween::satisfy_start_end_time(int64_t start_time, int64_t end_time) { if (not_) { return start_time < value1_ || end_time > value2_; } else { @@ -50,7 +50,7 @@ bool TimeBetween::satisfy_start_end_time(long start_time, long end_time) { } } -bool TimeBetween::contain_start_end_time(long start_time, long end_time) { +bool TimeBetween::contain_start_end_time(int64_t start_time, int64_t end_time) { if (not_) { return end_time < value1_ || start_time > value2_; } else { @@ -83,18 +83,18 @@ TimeIn::~TimeIn() {} bool TimeIn::satisfy(Statistic *statistic) { return true; } -bool TimeIn::satisfy(long time, int64_t value) { +bool TimeIn::satisfy(int64_t time, int64_t value) { std::vector::iterator it = find(values_.begin(), values_.end(), time); bool result = (it != values_.end() ? true : false); return result != not_; } -bool TimeIn::satisfy_start_end_time(long start_time, long end_time) { +bool TimeIn::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return true; } -bool TimeIn::contain_start_end_time(long start_time, long end_time) { +bool TimeIn::contain_start_end_time(int64_t start_time, int64_t end_time) { return true; } @@ -115,13 +115,13 @@ bool TimeEq::satisfy(Statistic *statistic) { return value_ >= statistic->start_time_ && value_ <= statistic->end_time_; } -bool TimeEq::satisfy(long time, int64_t value) { return value_ == time; } +bool TimeEq::satisfy(int64_t time, int64_t value) { return value_ == time; } -bool TimeEq::satisfy_start_end_time(long start_time, long end_time) { +bool TimeEq::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return value_ <= end_time && value_ >= start_time; } -bool TimeEq::contain_start_end_time(long start_time, long end_time) { +bool TimeEq::contain_start_end_time(int64_t start_time, int64_t end_time) { return value_ == start_time && value_ == end_time; } @@ -140,13 +140,13 @@ bool TimeNotEq::satisfy(Statistic *statistic) { value_ == statistic->end_time_); } -bool TimeNotEq::satisfy(long time, int64_t value) { return !(value_ == time); } +bool TimeNotEq::satisfy(int64_t time, int64_t value) { return !(value_ == time); } -bool TimeNotEq::satisfy_start_end_time(long start_time, long end_time) { +bool TimeNotEq::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return value_ != end_time && value_ != start_time; } -bool TimeNotEq::contain_start_end_time(long start_time, long end_time) { +bool TimeNotEq::contain_start_end_time(int64_t start_time, int64_t end_time) { return value_ < start_time || value_ > end_time; } @@ -175,13 +175,13 @@ bool TimeGt::satisfy(Statistic *statistic) { return value_ < statistic->end_time_; } -bool TimeGt::satisfy(long time, int64_t value) { return value_ < time; } +bool TimeGt::satisfy(int64_t time, int64_t value) { return value_ < time; } -bool TimeGt::satisfy_start_end_time(long start_time, long end_time) { +bool TimeGt::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return value_ < end_time; } -bool TimeGt::contain_start_end_time(long start_time, long end_time) { +bool TimeGt::contain_start_end_time(int64_t start_time, int64_t end_time) { return value_ < start_time; } @@ -202,13 +202,13 @@ bool TimeGtEq::satisfy(Statistic *statistic) { return value_ <= statistic->end_time_; } -bool TimeGtEq::satisfy(long time, int64_t value) { return value_ <= time; } +bool TimeGtEq::satisfy(int64_t time, int64_t value) { return value_ <= time; } -bool TimeGtEq::satisfy_start_end_time(long start_time, long end_time) { +bool TimeGtEq::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return value_ <= end_time; } -bool TimeGtEq::contain_start_end_time(long start_time, long end_time) { +bool TimeGtEq::contain_start_end_time(int64_t start_time, int64_t end_time) { return value_ <= start_time; } @@ -227,13 +227,13 @@ bool TimeLt::satisfy(Statistic *statistic) { return value_ > statistic->start_time_; } -bool TimeLt::satisfy(long time, int64_t value) { return value_ > time; } +bool TimeLt::satisfy(int64_t time, int64_t value) { return value_ > time; } -bool TimeLt::satisfy_start_end_time(long start_time, long end_time) { +bool TimeLt::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return value_ > start_time; } -bool TimeLt::contain_start_end_time(long start_time, long end_time) { +bool TimeLt::contain_start_end_time(int64_t start_time, int64_t end_time) { return value_ > end_time; } @@ -254,13 +254,13 @@ bool TimeLtEq::satisfy(Statistic *statistic) { return value_ >= statistic->start_time_; } -bool TimeLtEq::satisfy(long time, int64_t value) { return value_ >= time; } +bool TimeLtEq::satisfy(int64_t time, int64_t value) { return value_ >= time; } -bool TimeLtEq::satisfy_start_end_time(long start_time, long end_time) { +bool TimeLtEq::satisfy_start_end_time(int64_t start_time, int64_t end_time) { return value_ >= start_time; } -bool TimeLtEq::contain_start_end_time(long start_time, long end_time) { +bool TimeLtEq::contain_start_end_time(int64_t start_time, int64_t end_time) { return value_ >= end_time; } diff --git a/cpp/src/reader/filter/time_operator.h b/cpp/src/reader/filter/time_operator.h index 8480aec33..58ee50146 100644 --- a/cpp/src/reader/filter/time_operator.h +++ b/cpp/src/reader/filter/time_operator.h @@ -38,11 +38,11 @@ class TimeBetween : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); FilterType get_filter_type() { return type_; } @@ -62,11 +62,11 @@ class TimeIn : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); @@ -85,11 +85,11 @@ class TimeEq : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); @@ -107,11 +107,11 @@ class TimeNotEq : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); FilterType get_filter_type() { return type_; } @@ -128,11 +128,11 @@ class TimeGt : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); @@ -150,11 +150,11 @@ class TimeGtEq : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); void reset_value(int64_t val) { value_ = val; } @@ -172,11 +172,11 @@ class TimeLt : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); @@ -194,11 +194,11 @@ class TimeLtEq : public Filter { bool satisfy(Statistic *statistic); - bool satisfy(long time, int64_t value); + bool satisfy(int64_t time, int64_t value); - bool satisfy_start_end_time(long start_time, long end_time); + bool satisfy_start_end_time(int64_t start_time, int64_t end_time); - bool contain_start_end_time(long start_time, long end_time); + bool contain_start_end_time(int64_t start_time, int64_t end_time); std::vector *get_time_ranges(); FilterType get_filter_type() { return type_; } diff --git a/cpp/src/reader/qds_with_timegenerator.cc b/cpp/src/reader/qds_with_timegenerator.cc index df42a329d..fd39ad300 100644 --- a/cpp/src/reader/qds_with_timegenerator.cc +++ b/cpp/src/reader/qds_with_timegenerator.cc @@ -287,17 +287,25 @@ int QDSWithTimeGenerator::init(TsFileIOReader *io_reader, QueryExpression *qe) { io_reader_ = io_reader; qe_ = qe; std::vector paths = qe_->selected_series_; - + std::vector column_names; + std::vector data_types; + column_names.reserve(paths.size()); + data_types.reserve(paths.size()); + for (const auto &path : paths) { + column_names.push_back(path.full_path_); + } for (size_t i = 0; i < paths.size(); i++) { ValueAt va; + index_lookup_.insert({paths[i].measurement_, i}); if (RET_FAIL(io_reader_->alloc_ssi(paths[i].device_, paths[i].measurement_, va.ssi_))) { } else { va.io_reader_ = io_reader_; + data_types.push_back(va.value_col_iter_->get_data_type()); value_at_vec_.push_back(va); } } - + result_set_metadata_ = new ResultSetMetadata(column_names, data_types); row_record_ = new RowRecord(value_at_vec_.size()); tree_ = construct_node_tree(qe->expression_); return E_OK; @@ -313,11 +321,15 @@ void destroy_node(Node *node) { delete node; } -void QDSWithTimeGenerator::destroy() { +void QDSWithTimeGenerator::close() { if (row_record_ != nullptr) { delete row_record_; row_record_ = nullptr; } + if (result_set_metadata_ != nullptr) { + delete result_set_metadata_; + result_set_metadata_ = nullptr; + } if (tree_ != nullptr) { destroy_node(tree_); tree_ = nullptr; @@ -325,16 +337,20 @@ void QDSWithTimeGenerator::destroy() { for (size_t i = 0; i < value_at_vec_.size(); i++) { value_at_vec_[i].destroy(); } + if (qe_ != nullptr) { + delete qe_; + qe_ = nullptr; + } value_at_vec_.clear(); } -RowRecord *QDSWithTimeGenerator::get_next() { +bool QDSWithTimeGenerator::next() { if (tree_ == nullptr) { - return nullptr; + return false; } int64_t timestamp = tree_->get_cur_timestamp(); if (timestamp == INVALID_NEXT_TIMESTAMP) { - return nullptr; + return false; } row_record_->set_timestamp(timestamp); #if DEBUG_SE @@ -352,9 +368,27 @@ RowRecord *QDSWithTimeGenerator::get_next() { #if DEBUG_SE std::cout << "\n\n" << std::endl; #endif - return row_record_; + return true; } +bool QDSWithTimeGenerator::is_null(const std::string &column_name) { + auto iter = index_lookup_.find(column_name); + if (iter == index_lookup_.end()) { + return true; + } else { + return is_null(iter->second); + } +} + +bool QDSWithTimeGenerator::is_null(uint32_t column_index) { + return row_record_->get_field(column_index) == nullptr; +} + +RowRecord *QDSWithTimeGenerator::get_row_record() { return row_record_; } + +ResultSetMetadata *QDSWithTimeGenerator::get_metadata() { + return result_set_metadata_; +} Node *QDSWithTimeGenerator::construct_node_tree(Expression *expr) { if (expr->type_ == AND_EXPR || expr->type_ == OR_EXPR) { Node *root = nullptr; diff --git a/cpp/src/reader/qds_with_timegenerator.h b/cpp/src/reader/qds_with_timegenerator.h index 3afafcf2f..43c367424 100644 --- a/cpp/src/reader/qds_with_timegenerator.h +++ b/cpp/src/reader/qds_with_timegenerator.h @@ -21,8 +21,8 @@ #include "common/db_common.h" #include "expression.h" -#include "query_data_set.h" #include "reader/tsfile_series_scan_iterator.h" +#include "result_set.h" namespace storage { @@ -106,25 +106,31 @@ struct Node { void next_timestamp(int64_t beyond_this_time); }; -class QDSWithTimeGenerator : public QueryDataSet { +class QDSWithTimeGenerator : public ResultSet { public: QDSWithTimeGenerator() : row_record_(nullptr), + result_set_metadata_(nullptr), io_reader_(nullptr), qe_(nullptr), tree_(nullptr), value_at_vec_() {} - ~QDSWithTimeGenerator() { destroy(); } + ~QDSWithTimeGenerator() { close(); } int init(TsFileIOReader *io_reader, QueryExpression *qe); - void destroy(); - RowRecord *get_next(); + void close(); + bool next(); + bool is_null(const std::string &column_name); + bool is_null(uint32_t column_index); + RowRecord *get_row_record(); + ResultSetMetadata *get_metadata(); private: Node *construct_node_tree(Expression *expr); private: RowRecord *row_record_; + ResultSetMetadata *result_set_metadata_; TsFileIOReader *io_reader_; QueryExpression *qe_; Node *tree_; diff --git a/cpp/src/reader/qds_without_timegenerator.cc b/cpp/src/reader/qds_without_timegenerator.cc index af7f24293..1fac9990a 100644 --- a/cpp/src/reader/qds_without_timegenerator.cc +++ b/cpp/src/reader/qds_without_timegenerator.cc @@ -34,6 +34,10 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, std::vector paths = qe_->selected_series_; size_t origin_path_count = paths.size(); std::vector valid_paths; + std::vector column_names; + std::vector data_types; + column_names.reserve(origin_path_count); + data_types.reserve(origin_path_count); Expression *global_time_expression = qe->expression_; Filter *global_time_filter = nullptr; if (global_time_expression != nullptr) { @@ -46,8 +50,10 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, if (ret != 0) { return ret; } else { + index_lookup_.insert({paths[i].measurement_, i}); ssi_vec_.push_back(ssi); valid_paths.push_back(paths[i]); + column_names.push_back(paths[i].full_path_); } } @@ -59,15 +65,21 @@ int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader, for (size_t i = 0; i < path_count; i++) { get_next_tsblock(i, true); + data_types.push_back(value_iters_[i]->get_data_type()); } + result_set_metadata_ = new ResultSetMetadata(column_names, data_types); return E_OK; // ignore invalid timeseries } -void QDSWithoutTimeGenerator::destroy() { +void QDSWithoutTimeGenerator::close() { if (row_record_ != nullptr) { delete row_record_; row_record_ = nullptr; } + if (result_set_metadata_ != nullptr) { + delete result_set_metadata_; + result_set_metadata_ = nullptr; + } for (size_t i = 0; i < time_iters_.size(); i++) { delete time_iters_[i]; time_iters_[i] = nullptr; @@ -89,12 +101,16 @@ void QDSWithoutTimeGenerator::destroy() { io_reader_->revert_ssi(ssi); } ssi_vec_.clear(); + if (qe_ != nullptr) { + delete qe_; + qe_ = nullptr; + } } -RowRecord *QDSWithoutTimeGenerator::get_next() { +bool QDSWithoutTimeGenerator::next() { row_record_->reset(); if (heap_time_.size() == 0) { - return nullptr; + return false; } int64_t time = heap_time_.begin()->first; row_record_->set_timestamp(time); @@ -118,7 +134,26 @@ RowRecord *QDSWithoutTimeGenerator::get_next() { iter++; // cppcheck-suppress postfixOperator heap_time_.erase(cur); } - return row_record_; + return true; +} + +bool QDSWithoutTimeGenerator::is_null(const std::string &column_name) { + auto iter = index_lookup_.find(column_name); + if (iter == index_lookup_.end()) { + return true; + } else { + return is_null(iter->second); + } +} + +bool QDSWithoutTimeGenerator::is_null(uint32_t column_index) { + return row_record_->get_field(column_index) == nullptr; +} + +RowRecord *QDSWithoutTimeGenerator::get_row_record() { return row_record_; } + +ResultSetMetadata *QDSWithoutTimeGenerator::get_metadata() { + return result_set_metadata_; } int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) { diff --git a/cpp/src/reader/qds_without_timegenerator.h b/cpp/src/reader/qds_without_timegenerator.h index 8b18caa06..b41655269 100644 --- a/cpp/src/reader/qds_without_timegenerator.h +++ b/cpp/src/reader/qds_without_timegenerator.h @@ -25,14 +25,15 @@ #include "expression.h" #include "file/tsfile_io_reader.h" -#include "query_data_set.h" +#include "result_set.h" namespace storage { -class QDSWithoutTimeGenerator : public QueryDataSet { +class QDSWithoutTimeGenerator : public ResultSet { public: QDSWithoutTimeGenerator() : row_record_(nullptr), + result_set_metadata_(nullptr), io_reader_(nullptr), qe_(nullptr), ssi_vec_(), @@ -40,16 +41,21 @@ class QDSWithoutTimeGenerator : public QueryDataSet { time_iters_(), value_iters_(), heap_time_() {} - ~QDSWithoutTimeGenerator() { destroy(); } + ~QDSWithoutTimeGenerator() { close(); } int init(TsFileIOReader *io_reader, QueryExpression *qe); - void destroy(); - RowRecord *get_next(); + void close(); + bool next(); + bool is_null(const std::string &column_name); + bool is_null(uint32_t column_index); + RowRecord *get_row_record(); + ResultSetMetadata *get_metadata(); private: int get_next_tsblock(uint32_t index, bool alloc_mem); private: RowRecord *row_record_; + ResultSetMetadata *result_set_metadata_; TsFileIOReader *io_reader_; QueryExpression *qe_; std::vector ssi_vec_; diff --git a/cpp/src/reader/query_data_set.h b/cpp/src/reader/query_data_set.h deleted file mode 100644 index 858427660..000000000 --- a/cpp/src/reader/query_data_set.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * License); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#ifndef READER_QUERY_DATA_SET_H -#define READER_QUERY_DATA_SET_H - -#include "common/row_record.h" - -namespace storage { - -class QueryDataSet { - public: - QueryDataSet() {} - virtual ~QueryDataSet() {} - virtual RowRecord *get_next() = 0; -}; - -} // namespace storage - -#endif // READER_QUERY_DATA_SET_H diff --git a/cpp/src/reader/result_set.h b/cpp/src/reader/result_set.h new file mode 100644 index 000000000..767c95860 --- /dev/null +++ b/cpp/src/reader/result_set.h @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#ifndef READER_QUERY_DATA_SET_H +#define READER_QUERY_DATA_SET_H + +#include + +#include "common/row_record.h" + +namespace storage { + +class ResultSetMetadata { + public: + ResultSetMetadata(const std::vector& column_names, + const std::vector& column_types) + : column_names_(column_names), column_types_(column_types) {} + common::TSDataType get_column_type(uint32_t column_index) { + ASSERT(column_index >= 0 && column_index < column_types_.size()); + return column_types_[column_index]; + } + std::string get_column_name(uint32_t column_index) { + ASSERT(column_index >= 0 && column_index < column_names_.size()); + return column_names_[column_index]; + } + + private: + std::vector column_names_; + std::vector column_types_; +}; + +class ResultSet { + public: + ResultSet() {} + virtual ~ResultSet() {} + virtual bool next() = 0; + virtual bool is_null(const std::string& column_name) = 0; + virtual bool is_null(uint32_t column_index) = 0; + + template + T get_value(const std::string& column_name) { + RowRecord* row_record = get_row_record(); + ASSERT(index_lookup_.count(column_name)); + uint32_t index = index_lookup_[column_name]; + ASSERT(index >= 0 && index < row_record->get_col_num()); + return row_record->get_field(index)->get_value(); + } + template + T get_value(uint32_t column_index) { + RowRecord* row_record = get_row_record(); + ASSERT(column_index >= 0 && column_index < row_record->get_col_num()); + return row_record->get_field(column_index)->get_value(); + } + virtual RowRecord* get_row_record() = 0; + virtual ResultSetMetadata* get_metadata() = 0; + virtual void close() = 0; + + protected: + std::unordered_map index_lookup_; +}; + +} // namespace storage + +#endif // READER_QUERY_DATA_SET_H diff --git a/cpp/src/reader/tsfile_executor.cc b/cpp/src/reader/tsfile_executor.cc index 757a8e235..71c03f0ee 100644 --- a/cpp/src/reader/tsfile_executor.cc +++ b/cpp/src/reader/tsfile_executor.cc @@ -58,11 +58,10 @@ int TsFileExecutor::init(const std::string &file_path) { return ret; } -int TsFileExecutor::execute(QueryExpression *query_expr, - QueryDataSet *&ret_qds) { +int TsFileExecutor::execute(QueryExpression *query_expr, ResultSet *&ret_qds) { ASSERT(is_inited_); query_exprs_ = query_expr; - std::vector paths = query_expr->selected_series_; + std::vector paths = query_exprs_->selected_series_; Expression *origin_expr = query_exprs_->expression_; Expression *regular_expr = nullptr; if (query_exprs_->has_filter_) { @@ -88,7 +87,7 @@ int TsFileExecutor::execute(QueryExpression *query_expr, } int TsFileExecutor::execute_may_with_global_timefilter(QueryExpression *qe, - QueryDataSet *&ret_qds) { + ResultSet *&ret_qds) { int ret = E_OK; QDSWithoutTimeGenerator *qds = new QDSWithoutTimeGenerator; ret = qds->init(&io_reader_, qe); @@ -101,7 +100,7 @@ int TsFileExecutor::execute_may_with_global_timefilter(QueryExpression *qe, } int TsFileExecutor::execute_with_timegenerator(QueryExpression *qe, - QueryDataSet *&ret_qds) { + ResultSet *&ret_qds) { int ret = E_OK; QDSWithTimeGenerator *qds = new QDSWithTimeGenerator; ret = qds->init(&io_reader_, qe); @@ -113,6 +112,6 @@ int TsFileExecutor::execute_with_timegenerator(QueryExpression *qe, return ret; } -void TsFileExecutor::destroy_query_data_set(QueryDataSet *qds) { delete qds; } +void TsFileExecutor::destroy_query_data_set(ResultSet *qds) { delete qds; } } // namespace storage diff --git a/cpp/src/reader/tsfile_executor.h b/cpp/src/reader/tsfile_executor.h index f768c4b4a..a87dba2b4 100644 --- a/cpp/src/reader/tsfile_executor.h +++ b/cpp/src/reader/tsfile_executor.h @@ -22,8 +22,8 @@ #include #include "file/read_file.h" -#include "query_data_set.h" #include "query_executor.h" +#include "result_set.h" namespace storage { @@ -34,13 +34,15 @@ class TsFileExecutor // : public QueryExecutor ~TsFileExecutor(); int init(ReadFile *read_file); int init(const std::string &file_path); - int execute(QueryExpression *query_expr, QueryDataSet *&ret_qds); - void destroy_query_data_set(QueryDataSet *qds); + int execute(QueryExpression *query_expr, ResultSet *&ret_qds); + void destroy_query_data_set(ResultSet *qds); + TsFileMeta *get_tsfile_meta() { return io_reader_.get_tsfile_meta(); } + TsFileIOReader *get_tsfile_io_reader() { return &io_reader_; } private: int execute_may_with_global_timefilter(QueryExpression *qe, - QueryDataSet *&ret_qds); - int execute_with_timegenerator(QueryExpression *qe, QueryDataSet *&ret_qds); + ResultSet *&ret_qds); + int execute_with_timegenerator(QueryExpression *qe, ResultSet *&ret_qds); private: TsFileIOReader io_reader_; diff --git a/cpp/src/reader/tsfile_reader.cc b/cpp/src/reader/tsfile_reader.cc index 3b73b4f3f..7104493b3 100644 --- a/cpp/src/reader/tsfile_reader.cc +++ b/cpp/src/reader/tsfile_reader.cc @@ -18,6 +18,8 @@ */ #include "tsfile_reader.h" +#include "common/schema.h" +#include "filter/time_operator.h" #include "tsfile_executor.h" using namespace common; @@ -27,7 +29,22 @@ namespace storage { TsFileReader::TsFileReader() : read_file_(nullptr), tsfile_executor_(nullptr) {} -TsFileReader::~TsFileReader() { +TsFileReader::~TsFileReader() { close(); } + +int TsFileReader::open(const std::string &file_path) { + int ret = E_OK; + read_file_ = new storage::ReadFile; + tsfile_executor_ = new storage::TsFileExecutor(); + if (RET_FAIL(read_file_->open(file_path))) { + std::cout << "filed to open file " << ret << std::endl; + } else if (RET_FAIL(tsfile_executor_->init(read_file_))) { + std::cout << "filed to init " << ret << std::endl; + } + return ret; +} + +int TsFileReader::close() { + int ret = E_OK; if (tsfile_executor_ != nullptr) { delete tsfile_executor_; tsfile_executor_ = nullptr; @@ -37,29 +54,74 @@ TsFileReader::~TsFileReader() { delete read_file_; read_file_ = nullptr; } + return ret; // TO DO } -int TsFileReader::open(const std::string &file_path) { +int TsFileReader::query(QueryExpression *qe, ResultSet *&ret_qds) { + return tsfile_executor_->execute(qe, ret_qds); +} + +int TsFileReader::query(std::vector &path_list, int64_t start_time, + int64_t end_time, ResultSet *&result_set) { int ret = E_OK; - read_file_ = new storage::ReadFile; - tsfile_executor_ = new storage::TsFileExecutor(); - if (RET_FAIL(read_file_->open(file_path))) { - std::cout << "filed to open file " << ret << std::endl; - } else if (RET_FAIL(tsfile_executor_->init(read_file_))) { - std::cout << "filed to init " << ret << std::endl; + Filter *time_filter = new TimeBetween(start_time, end_time, false); + Expression *exp = + new storage::Expression(storage::GLOBALTIME_EXPR, time_filter); + std::vector path_list_vec; + for (const auto &path : path_list) { + uint32_t last_point_pos = path.find_last_of('.'); + if (last_point_pos <= 0) { + return E_INVALID_PATH; + } + std::string device_name = path.substr(0, last_point_pos); + std::string measurement_name = + path.substr(last_point_pos + 1, path.size() - last_point_pos); + path_list_vec.emplace_back(Path(device_name, measurement_name)); } + QueryExpression *query_expression = + QueryExpression::create(path_list_vec, exp); + ret = tsfile_executor_->execute(query_expression, result_set); return ret; } -int TsFileReader::query(QueryExpression *qe, QueryDataSet *&ret_qds) { - return tsfile_executor_->execute(qe, ret_qds); +void TsFileReader::destroy_query_data_set(storage::ResultSet *qds) { + tsfile_executor_->destroy_query_data_set(qds); } -void TsFileReader::destroy_query_data_set(storage::QueryDataSet *qds) { - tsfile_executor_->destroy_query_data_set(qds); +std::vector TsFileReader::get_all_devices() { + TsFileMeta *tsfile_meta = tsfile_executor_->get_tsfile_meta(); + std::vector device_ids; + if (tsfile_meta != nullptr) { + device_ids.reserve(tsfile_meta->index_node_->children_.size()); + for (const auto &meta_index_entry : + tsfile_meta->index_node_->children_) { + device_ids.push_back(meta_index_entry->name_.to_std_string()); + } + } + return device_ids; +} + +int TsFileReader::get_timeseries_schema( + const std::string &device_id, std::vector &result) { + int ret = E_OK; + std::vector timeseries_indexs; + PageArena pa; + pa.init(512, MOD_TSFILE_READER); + if (RET_FAIL(tsfile_executor_->get_tsfile_io_reader() + ->get_device_timeseries_meta_without_chunk_meta( + device_id, timeseries_indexs, pa))) { + } else { + for (auto timeseries_index : timeseries_indexs) { + MeasurementSchema ms( + timeseries_index->get_measurement_name().to_std_string(), + timeseries_index->get_data_type()); + result.push_back(ms); + } + } + return E_OK; } -QueryDataSet *TsFileReader::read_timeseries( +ResultSet *TsFileReader::read_timeseries( const std::string &device_name, std::vector measurement_name) { return nullptr; } diff --git a/cpp/src/reader/tsfile_reader.h b/cpp/src/reader/tsfile_reader.h index 3d90ffc46..1f67f1b1c 100644 --- a/cpp/src/reader/tsfile_reader.h +++ b/cpp/src/reader/tsfile_reader.h @@ -27,7 +27,8 @@ namespace storage { class TsFileExecutor; class ReadFile; -class QueryDataSet; +class ResultSet; +struct MeasurementSchema; } // namespace storage namespace storage { @@ -40,10 +41,16 @@ class TsFileReader { TsFileReader(); ~TsFileReader(); int open(const std::string &file_path); - int query(storage::QueryExpression *qe, QueryDataSet *&ret_qds); - void destroy_query_data_set(QueryDataSet *qds); - QueryDataSet *read_timeseries(const std::string &device_name, - std::vector measurement_name); + int close(); + int query(storage::QueryExpression *qe, ResultSet *&ret_qds); + int query(std::vector &path_list, int64_t start_time, + int64_t end_time, ResultSet *&result_set); + void destroy_query_data_set(ResultSet *qds); + ResultSet *read_timeseries(const std::string &device_name, + std::vector measurement_name); + std::vector get_all_devices(); + int get_timeseries_schema(const std::string &device_id, + std::vector &result); private: storage::ReadFile *read_file_; diff --git a/cpp/src/writer/time_page_writer.cc b/cpp/src/writer/time_page_writer.cc index 215b0b766..8093a76fa 100644 --- a/cpp/src/writer/time_page_writer.cc +++ b/cpp/src/writer/time_page_writer.cc @@ -70,7 +70,8 @@ int TimePageData::init(ByteStream &time_bs, Compressor *compressor) { int TimePageWriter::init(TSEncoding encoding, CompressionType compression) { int ret = E_OK; - if (nullptr == (time_encoder_ = EncoderFactory::alloc_time_encoder())) { + if (nullptr == + (time_encoder_ = EncoderFactory::alloc_time_encoder(encoding))) { ret = E_OOM; } else if (nullptr == (statistic_ = StatisticFactory::alloc_statistic( common::VECTOR))) { diff --git a/cpp/src/writer/tsfile_writer.cc b/cpp/src/writer/tsfile_writer.cc index 8cc0b6d4d..61b0510bb 100644 --- a/cpp/src/writer/tsfile_writer.cc +++ b/cpp/src/writer/tsfile_writer.cc @@ -130,23 +130,24 @@ int TsFileWriter::open(const std::string &file_path, int flags, mode_t mode) { return ret; } +int TsFileWriter::open(const std::string &file_path) { + return open(file_path, O_RDWR | O_CREAT | O_TRUNC, 0666); +} + int TsFileWriter::register_aligned_timeseries( - const std::string &device_path, const std::string &measurement_name, - common::TSDataType data_type, common::TSEncoding encoding, - common::CompressionType compression_type) { - MeasurementSchema *ms = new MeasurementSchema(measurement_name, data_type, - encoding, compression_type); - return register_timeseries(device_path, ms, true); + const std::string &device_id, const MeasurementSchema &measurement_schema) { + MeasurementSchema *ms = new MeasurementSchema( + measurement_schema.measurement_name_, measurement_schema.data_type_, + measurement_schema.encoding_, measurement_schema.compression_type_); + return register_timeseries(device_id, ms, true); } int TsFileWriter::register_aligned_timeseries( - const std::string &device_path, - const std::vector &measurement_schema_vec) { + const std::string &device_id, + const std::vector &measurement_schemas) { int ret = E_OK; - std::vector::const_iterator it = - measurement_schema_vec.begin(); - for (; it != measurement_schema_vec.end(); it++) { - ret = register_timeseries(device_path, *it, true); + for (auto it : measurement_schemas) { + ret = register_timeseries(device_id, it, true); if (ret != E_OK) { return ret; } @@ -155,18 +156,17 @@ int TsFileWriter::register_aligned_timeseries( } int TsFileWriter::register_timeseries( - const std::string &device_path, const std::string &measurement_name, - common::TSDataType data_type, common::TSEncoding encoding, - common::CompressionType compression_type) { - MeasurementSchema *ms = new MeasurementSchema(measurement_name, data_type, - encoding, compression_type); - return register_timeseries(device_path, ms); + const std::string &device_id, const MeasurementSchema &measurement_schema) { + MeasurementSchema *ms = new MeasurementSchema( + measurement_schema.measurement_name_, measurement_schema.data_type_, + measurement_schema.encoding_, measurement_schema.compression_type_); + return register_timeseries(device_id, ms, false); } -int TsFileWriter::register_timeseries(const std::string &device_path, +int TsFileWriter::register_timeseries(const std::string &device_id, MeasurementSchema *measurement_schema, bool is_aligned) { - DeviceSchemaIter device_iter = schemas_.find(device_path); + DeviceSchemaIter device_iter = schemas_.find(device_id); if (device_iter != schemas_.end()) { MeasurementSchemaMap &msm = device_iter->second->measurement_schema_map_; @@ -180,20 +180,20 @@ int TsFileWriter::register_timeseries(const std::string &device_path, ms_group->is_aligned_ = is_aligned; ms_group->measurement_schema_map_.insert(std::make_pair( measurement_schema->measurement_name_, measurement_schema)); - schemas_.insert(std::make_pair(device_path, ms_group)); + schemas_.insert(std::make_pair(device_id, ms_group)); } return E_OK; } int TsFileWriter::register_timeseries( - const std::string &device_path, + const std::string &device_id, const std::vector &measurement_schema_vec) { int ret = E_OK; std::vector::const_iterator it = measurement_schema_vec.begin(); for (; it != measurement_schema_vec.end(); it++) { // cppcheck-suppress postfixOperator - ret = register_timeseries(device_path, *it); + ret = register_timeseries(device_id, *it); if (ret != E_OK) { return ret; } @@ -410,8 +410,8 @@ int TsFileWriter::write_record(const TsRecord &record) { // std::vector chunk_writers; SimpleVector chunk_writers; MeasurementNamesFromRecord mnames_getter(record); - if (RET_FAIL(do_check_schema(record.device_name_, mnames_getter, - chunk_writers))) { + if (RET_FAIL( + do_check_schema(record.device_id_, mnames_getter, chunk_writers))) { return ret; } @@ -435,7 +435,7 @@ int TsFileWriter::write_record_aligned(const TsRecord &record) { SimpleVector value_chunk_writers; TimeChunkWriter *time_chunk_writer; MeasurementNamesFromRecord mnames_getter(record); - if (RET_FAIL(do_check_schema_aligned(record.device_name_, mnames_getter, + if (RET_FAIL(do_check_schema_aligned(record.device_id_, mnames_getter, time_chunk_writer, value_chunk_writers))) { return ret; @@ -509,7 +509,7 @@ int TsFileWriter::write_tablet_aligned(const Tablet &tablet) { SimpleVector value_chunk_writers; TimeChunkWriter *time_chunk_writer = nullptr; MeasurementNamesFromTablet mnames_getter(tablet); - if (RET_FAIL(do_check_schema_aligned(tablet.device_name_, mnames_getter, + if (RET_FAIL(do_check_schema_aligned(tablet.device_id_, mnames_getter, time_chunk_writer, value_chunk_writers))) { return ret; @@ -529,8 +529,8 @@ int TsFileWriter::write_tablet(const Tablet &tablet) { int ret = E_OK; SimpleVector chunk_writers; MeasurementNamesFromTablet mnames_getter(tablet); - if (RET_FAIL(do_check_schema(tablet.device_name_, mnames_getter, - chunk_writers))) { + if (RET_FAIL( + do_check_schema(tablet.device_id_, mnames_getter, chunk_writers))) { return ret; } ASSERT(chunk_writers.size() == tablet.get_column_count()); @@ -543,7 +543,7 @@ int TsFileWriter::write_tablet(const Tablet &tablet) { write_column(chunk_writer, tablet, c); } - record_count_since_last_flush_ += tablet.max_rows_; + record_count_since_last_flush_ += tablet.max_row_num_; ret = check_memory_size_and_may_flush_chunks(); return ret; } @@ -556,7 +556,7 @@ int TsFileWriter::write_column(ChunkWriter *chunk_writer, const Tablet &tablet, int64_t *timestamps = tablet.timestamps_; void *col_values = tablet.value_matrix_[col_idx]; BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx]; - int32_t row_count = tablet.max_rows_; + int32_t row_count = tablet.max_row_num_; if (data_type == common::BOOLEAN) { ret = write_typed_column(chunk_writer, timestamps, (bool *)col_values, @@ -589,7 +589,7 @@ int TsFileWriter::value_write_column(ValueChunkWriter *value_chunk_writer, int64_t *timestamps = tablet.timestamps_; void *col_values = tablet.value_matrix_[col_idx]; BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx]; - int32_t row_count = tablet.max_rows_; + int32_t row_count = tablet.max_row_num_; if (data_type == common::BOOLEAN) { ret = write_typed_column(value_chunk_writer, timestamps, diff --git a/cpp/src/writer/tsfile_writer.h b/cpp/src/writer/tsfile_writer.h index 7b43b850f..d1b3b96b8 100644 --- a/cpp/src/writer/tsfile_writer.h +++ b/cpp/src/writer/tsfile_writer.h @@ -50,21 +50,17 @@ class TsFileWriter { void destroy(); int open(const std::string &file_path, int flags, mode_t mode); + int open(const std::string &file_path); int init(storage::WriteFile *write_file); - int register_timeseries(const std::string &device_path, - const std::string &measurement_name, - common::TSDataType data_type, - common::TSEncoding encoding, - common::CompressionType compression_type); - int register_aligned_timeseries(const std::string &device_path, - const std::string &measurement_name, - common::TSDataType data_type, - common::TSEncoding encoding, - common::CompressionType compression_type); + int register_timeseries(const std::string &device_id, + const MeasurementSchema &measurement_schema); int register_aligned_timeseries( - const std::string &device_path, - const std::vector &measurement_schema_vec); + const std::string &device_id, + const MeasurementSchema &measurement_schema); + int register_aligned_timeseries( + const std::string &device_id, + const std::vector &measurement_schemas); int write_record(const TsRecord &record); int write_tablet(const Tablet &tablet); int write_record_aligned(const TsRecord &record); @@ -130,11 +126,11 @@ class TsFileWriter { // std::vector &chunk_writers); int write_column(storage::ChunkWriter *chunk_writer, const Tablet &tablet, int col_idx); - int register_timeseries(const std::string &device_path, + int register_timeseries(const std::string &device_id, MeasurementSchema *measurement_schema, bool is_aligned = false); int register_timeseries( - const std::string &device_path, + const std::string &device_id, const std::vector &measurement_schema_vec); private: diff --git a/cpp/test/CMakeLists.txt b/cpp/test/CMakeLists.txt index f5c42572d..d17b90109 100644 --- a/cpp/test/CMakeLists.txt +++ b/cpp/test/CMakeLists.txt @@ -72,6 +72,7 @@ file(GLOB_RECURSE TEST_SRCS "utils/*_test.cc" "file/*_test.cc" "compress/*_test.cc" + "reader/*_test.cc" "writer/*_test.cc" ) if (${COV_ENABLED}) @@ -80,7 +81,7 @@ if (${COV_ENABLED}) endif () if(NOT WIN32) -# enable address sanitizer default +#enable address sanitizer default set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsanitize=address -g") endif() diff --git a/cpp/test/common/record_test.cc b/cpp/test/common/record_test.cc index 211b2235d..3a5c67bb6 100644 --- a/cpp/test/common/record_test.cc +++ b/cpp/test/common/record_test.cc @@ -90,21 +90,20 @@ TEST(DataPointTest, SetDouble) { TEST(TsRecordTest, ConstructorWithDeviceName) { TsRecord ts_record("device1"); - EXPECT_EQ(ts_record.device_name_, "device1"); + EXPECT_EQ(ts_record.device_id_, "device1"); EXPECT_EQ(ts_record.points_.size(), 0); } TEST(TsRecordTest, ConstructorWithTimestamp) { TsRecord ts_record(1625140800, "device1", 5); EXPECT_EQ(ts_record.timestamp_, 1625140800); - EXPECT_EQ(ts_record.device_name_, "device1"); + EXPECT_EQ(ts_record.device_id_, "device1"); EXPECT_EQ(ts_record.points_.capacity(), 5); } -TEST(TsRecordTest, AppendDataPoint) { +TEST(TsRecordTest, AddPoint) { TsRecord ts_record("device1"); - DataPoint dp("temperature", 36.6); - ts_record.append_data_point(dp); + ts_record.add_point("temperature", 36.6); ASSERT_EQ(ts_record.points_.size(), 1); EXPECT_EQ(ts_record.points_[0].measurement_name_, "temperature"); EXPECT_EQ(ts_record.points_[0].data_type_, common::DOUBLE); @@ -114,8 +113,7 @@ TEST(TsRecordTest, AppendDataPoint) { TEST(TsRecordTest, LargeQuantities) { TsRecord ts_record("device1"); for (int i = 0; i < 10000; i++) { - DataPoint dp(std::to_string(i), 36.6); - ts_record.append_data_point(dp); + ts_record.add_point(std::to_string(i), 36.6); } ASSERT_EQ(ts_record.points_.size(), 10000); diff --git a/cpp/test/common/tablet_test.cc b/cpp/test/common/tablet_test.cc index 0ebf4619e..bd795bcfe 100644 --- a/cpp/test/common/tablet_test.cc +++ b/cpp/test/common/tablet_test.cc @@ -34,17 +34,18 @@ TEST(TabletTest, BasicFunctionality) { schema_vec.push_back(MeasurementSchema( "measurement2", common::TSDataType::BOOLEAN, common::TSEncoding::RLE, common::CompressionType::SNAPPY)); - Tablet tablet(device_name, &schema_vec); + Tablet tablet(device_name, + std::make_shared>(schema_vec)); EXPECT_EQ(tablet.get_column_count(), schema_vec.size()); EXPECT_EQ(tablet.init(), common::E_OK); - EXPECT_EQ(tablet.set_value(0, "measurement1", true), common::E_OK); - EXPECT_EQ(tablet.set_value(0, "measurement2", false), common::E_OK); + EXPECT_EQ(tablet.add_value(0, "measurement1", true), common::E_OK); + EXPECT_EQ(tablet.add_value(0, "measurement2", false), common::E_OK); - EXPECT_EQ(tablet.set_value(1, 0, false), common::E_OK); - EXPECT_EQ(tablet.set_value(1, 1, true), common::E_OK); + EXPECT_EQ(tablet.add_value(1, 0, false), common::E_OK); + EXPECT_EQ(tablet.add_value(1, 1, true), common::E_OK); } TEST(TabletTest, LargeQuantities) { @@ -56,7 +57,8 @@ TEST(TabletTest, LargeQuantities) { "measurement" + std::to_string(i), common::TSDataType::BOOLEAN, common::TSEncoding::RLE, common::CompressionType::SNAPPY)); } - Tablet tablet(device_name, &schema_vec); + Tablet tablet(device_name, + std::make_shared>(schema_vec)); EXPECT_EQ(tablet.get_column_count(), schema_vec.size()); } diff --git a/cpp/test/reader/tsfile_reader_test.cc b/cpp/test/reader/tsfile_reader_test.cc new file mode 100644 index 000000000..4d4857920 --- /dev/null +++ b/cpp/test/reader/tsfile_reader_test.cc @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * License); you may not use this file except in compliance + * with the License. You may obtain a copy of the License a + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include "reader/tsfile_reader.h" + +#include + +#include +#include + +#include "common/path.h" +#include "common/record.h" +#include "common/schema.h" +#include "common/tablet.h" +#include "file/tsfile_io_writer.h" +#include "file/write_file.h" +#include "reader/qds_without_timegenerator.h" +#include "writer/tsfile_writer.h" + +using namespace storage; +using namespace common; + +class TsFileReaderTest : public ::testing::Test { + protected: + void SetUp() override { + tsfile_writer_ = new TsFileWriter(); + libtsfile_init(); + file_name_ = std::string("tsfile_writer_test_") + + generate_random_string(10) + std::string(".tsfile"); + remove(file_name_.c_str()); + int flags = O_WRONLY | O_CREAT | O_TRUNC; +#ifdef _WIN32 + flags |= O_BINARY; +#endif + mode_t mode = 0666; + EXPECT_EQ(tsfile_writer_->open(file_name_, flags, mode), common::E_OK); + } + void TearDown() override { + delete tsfile_writer_; + remove(file_name_.c_str()); + } + + std::string file_name_; + TsFileWriter *tsfile_writer_ = nullptr; + + public: + static std::string generate_random_string(int length) { + std::random_device rd; + std::mt19937 gen(rd()); + std::uniform_int_distribution<> dis(0, 61); + + const std::string chars = + "0123456789" + "abcdefghijklmnopqrstuvwxyz" + "ABCDEFGHIJKLMNOPQRSTUVWXYZ"; + + std::string random_string; + + for (int i = 0; i < length; ++i) { + random_string += chars[dis(gen)]; + } + + return random_string; + } + + static std::string field_to_string(storage::Field *value) { + if (value->type_ == common::TEXT) { + return std::string(value->value_.sval_); + } else { + std::stringstream ss; + switch (value->type_) { + case common::BOOLEAN: + ss << (value->value_.bval_ ? "true" : "false"); + break; + case common::INT32: + ss << value->value_.ival_; + break; + case common::INT64: + ss << value->value_.lval_; + break; + case common::FLOAT: + ss << value->value_.fval_; + break; + case common::DOUBLE: + ss << value->value_.dval_; + break; + case common::NULL_TYPE: + ss << "NULL"; + break; + default: + ASSERT(false); + break; + } + return ss.str(); + } + } +}; + +TEST_F(TsFileReaderTest, ResultSetMetadata) { + std::string device_path = "device1"; + std::string measurement_name = "temperature"; + common::TSDataType data_type = common::TSDataType::INT32; + common::TSEncoding encoding = common::TSEncoding::PLAIN; + common::CompressionType compression_type = + common::CompressionType::UNCOMPRESSED; + tsfile_writer_->register_timeseries( + device_path, storage::MeasurementSchema(measurement_name, data_type, + encoding, compression_type)); + + for (int i = 0; i < 50000; ++i) { + TsRecord record(1622505600000 + i * 1000, device_path); + record.add_point(measurement_name, (int32_t)i); + ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + } + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + std::vector select_list = {"device1.temperature"}; + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + storage::ResultSet *tmp_qds = nullptr; + + ret = reader.query(select_list, 1622505600000, 1622505600000 + 50000 * 1000, + tmp_qds); + auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; + + ResultSetMetadata *result_set_metadaa = qds->get_metadata(); + ASSERT_EQ(result_set_metadaa->get_column_type(0), data_type); + ASSERT_EQ(result_set_metadaa->get_column_name(0), + device_path + "." + measurement_name); + reader.destroy_query_data_set(qds); + reader.close(); +} + +TEST_F(TsFileReaderTest, GetAllDevice) { + std::vector device_path = {"device", "device.ln"}; + std::vector measurement_name = {"temperature", "humidity"}; + common::TSDataType data_type = common::TSDataType::INT32; + common::TSEncoding encoding = common::TSEncoding::PLAIN; + common::CompressionType compression_type = + common::CompressionType::UNCOMPRESSED; + tsfile_writer_->register_timeseries( + device_path[0], + storage::MeasurementSchema(measurement_name[0], data_type, encoding, + compression_type)); + tsfile_writer_->register_timeseries( + device_path[1], + storage::MeasurementSchema(measurement_name[1], data_type, encoding, + compression_type)); + TsRecord record_0(1622505600000, device_path[0]); + record_0.add_point(measurement_name[0], (int32_t)0); + TsRecord record_1(1622505600000, device_path[1]); + record_1.add_point(measurement_name[1], (int32_t)1); + ASSERT_EQ(tsfile_writer_->write_record(record_0), E_OK); + ASSERT_EQ(tsfile_writer_->write_record(record_1), E_OK); + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + std::vector devices = reader.get_all_devices(); + ASSERT_EQ(devices.size(), 2); + ASSERT_EQ(devices[0], device_path[0]); + ASSERT_EQ(devices[1], device_path[1]); +} + +TEST_F(TsFileReaderTest, GetTimeseriesSchema) { + std::vector device_path = {"device", "device.ln"}; + std::vector measurement_name = {"temperature", "humidity"}; + common::TSDataType data_type = common::TSDataType::INT32; + common::TSEncoding encoding = common::TSEncoding::PLAIN; + common::CompressionType compression_type = + common::CompressionType::UNCOMPRESSED; + tsfile_writer_->register_timeseries( + device_path[0], + storage::MeasurementSchema(measurement_name[0], data_type, encoding, + compression_type)); + tsfile_writer_->register_timeseries( + device_path[1], + storage::MeasurementSchema(measurement_name[1], data_type, encoding, + compression_type)); + TsRecord record_0(1622505600000, device_path[0]); + record_0.add_point(measurement_name[0], (int32_t)0); + TsRecord record_1(1622505600000, device_path[1]); + record_1.add_point(measurement_name[1], (int32_t)1); + ASSERT_EQ(tsfile_writer_->write_record(record_0), E_OK); + ASSERT_EQ(tsfile_writer_->write_record(record_1), E_OK); + ASSERT_EQ(tsfile_writer_->flush(), E_OK); + ASSERT_EQ(tsfile_writer_->close(), E_OK); + + storage::TsFileReader reader; + int ret = reader.open(file_name_); + ASSERT_EQ(ret, common::E_OK); + std::vector measurement_schemas; + reader.get_timeseries_schema(device_path[0], measurement_schemas); + ASSERT_EQ(measurement_schemas[0].measurement_name_, measurement_name[0]); + ASSERT_EQ(measurement_schemas[0].data_type_, TSDataType::INT32); + + reader.get_timeseries_schema(device_path[1], measurement_schemas); + ASSERT_EQ(measurement_schemas[1].measurement_name_, measurement_name[1]); + ASSERT_EQ(measurement_schemas[1].data_type_, TSDataType::INT32); + reader.close(); +} \ No newline at end of file diff --git a/cpp/test/writer/tsfile_writer_test.cc b/cpp/test/writer/tsfile_writer_test.cc index 6b6011265..9d51078f5 100644 --- a/cpp/test/writer/tsfile_writer_test.cc +++ b/cpp/test/writer/tsfile_writer_test.cc @@ -127,9 +127,10 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) { for (uint32_t i = 0; i < measurement_names.size(); i++) { std::string measurement_name = measurement_names[i]; common::TSDataType data_type = data_types[i]; - tsfile_writer_->register_timeseries(device_name, measurement_name, - data_type, encoding, - compression_type); + tsfile_writer_->register_timeseries( + device_name, + storage::MeasurementSchema(measurement_name, data_type, encoding, + compression_type)); } int row_num = 1000; @@ -140,19 +141,16 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) { common::TSDataType data_type = data_types[j]; switch (data_type) { case BOOLEAN: - record.append_data_point(DataPoint(measurement_name, true)); + record.add_point(measurement_name, true); break; case INT64: - record.append_data_point( - DataPoint(measurement_name, (int64_t)415412)); + record.add_point(measurement_name, (int64_t)415412); break; case FLOAT: - record.append_data_point( - DataPoint(measurement_name, (float)1.0)); + record.add_point(measurement_name, (float)1.0); break; case DOUBLE: - record.append_data_point( - DataPoint(measurement_name, (double)2.0)); + record.add_point(measurement_name, (double)2.0); break; default: break; @@ -163,35 +161,43 @@ TEST_F(TsFileWriterTest, WriteDiffDataType) { ASSERT_EQ(tsfile_writer_->flush(), E_OK); ASSERT_EQ(tsfile_writer_->close(), E_OK); - std::vector select_list; + std::vector select_list; + select_list.reserve(measurement_names.size()); for (uint32_t i = 0; i < measurement_names.size(); ++i) { std::string measurement_name = measurement_names[i]; - storage::Path path(device_name, measurement_name); - select_list.push_back(path); + std::string path_name = device_name + "." + measurement_name; + select_list.emplace_back(path_name); } - storage::QueryExpression *query_expr = - storage::QueryExpression::create(select_list, nullptr); storage::TsFileReader reader; int ret = reader.open(file_name_); ASSERT_EQ(ret, common::E_OK); - storage::QueryDataSet *tmp_qds = nullptr; + storage::ResultSet *tmp_qds = nullptr; - ret = reader.query(query_expr, tmp_qds); + ret = reader.query(select_list, 1622505600000, + 1622505600000 + row_num * 100, tmp_qds); auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; - storage::RowRecord *record; int64_t cur_record_num = 0; do { - record = qds->get_next(); - if (!record) { + if (!qds->next()) { break; } cur_record_num++; + ASSERT_EQ(qds->get_value(0), (float)1.0); + ASSERT_EQ(qds->get_value(1), (int64_t)415412); + ASSERT_EQ(qds->get_value(2), true); + ASSERT_EQ(qds->get_value(3), (double)2.0); + + ASSERT_EQ(qds->get_value(measurement_names[0]), (float)1.0); + ASSERT_EQ(qds->get_value(measurement_names[1]), + (int64_t)415412); + ASSERT_EQ(qds->get_value(measurement_names[2]), true); + ASSERT_EQ(qds->get_value(measurement_names[3]), (double)2.0); } while (true); EXPECT_EQ(cur_record_num, row_num); - storage::QueryExpression::destory(query_expr); reader.destroy_query_data_set(qds); + reader.close(); } TEST_F(TsFileWriterTest, RegisterTimeSeries) { @@ -202,9 +208,10 @@ TEST_F(TsFileWriterTest, RegisterTimeSeries) { common::CompressionType compression_type = common::CompressionType::UNCOMPRESSED; - ASSERT_EQ(tsfile_writer_->register_timeseries(device_path, measurement_name, - data_type, encoding, - compression_type), + ASSERT_EQ(tsfile_writer_->register_timeseries( + device_path, + storage::MeasurementSchema(measurement_name, data_type, + encoding, compression_type)), E_OK); } @@ -215,13 +222,13 @@ TEST_F(TsFileWriterTest, WriteMultipleRecords) { common::TSEncoding encoding = common::TSEncoding::PLAIN; common::CompressionType compression_type = common::CompressionType::UNCOMPRESSED; - tsfile_writer_->register_timeseries(device_path, measurement_name, - data_type, encoding, compression_type); + tsfile_writer_->register_timeseries( + device_path, storage::MeasurementSchema(measurement_name, data_type, + encoding, compression_type)); for (int i = 0; i < 50000; ++i) { TsRecord record(1622505600000 + i * 1000, device_path); - DataPoint point(measurement_name, (int32_t)i); - record.append_data_point(point); + record.add_point(measurement_name, (int32_t)i); ASSERT_EQ(tsfile_writer_->write_record(record), E_OK); ASSERT_EQ(tsfile_writer_->flush(), E_OK); } @@ -243,20 +250,24 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsMultiFlush) { common::TSEncoding::PLAIN, common::CompressionType::UNCOMPRESSED); tsfile_writer_->register_timeseries( - device_name, measure_name, common::TSDataType::INT32, - common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); + device_name, storage::MeasurementSchema( + measure_name, common::TSDataType::INT32, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); } } for (int tablet_num = 0; tablet_num < max_tablet_num; tablet_num++) { for (int i = 0; i < device_num; i++) { std::string device_name = "test_device" + std::to_string(i); - Tablet tablet(device_name, &schema_vecs[i], 1); + Tablet tablet(device_name, + std::make_shared>( + schema_vecs[i]), + 1); tablet.init(); for (int j = 0; j < measurement_num; j++) { - tablet.set_timestamp(0, 16225600000 + tablet_num * 100); - tablet.set_value(0, j, static_cast(tablet_num)); + tablet.add_timestamp(0, 16225600000 + tablet_num * 100); + tablet.add_value(0, j, static_cast(tablet_num)); } ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); } @@ -279,7 +290,7 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsMultiFlush) { storage::TsFileReader reader; int ret = reader.open(file_name_); ASSERT_EQ(ret, common::E_OK); - storage::QueryDataSet *tmp_qds = nullptr; + storage::ResultSet *tmp_qds = nullptr; ret = reader.query(query_expr, tmp_qds); auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; @@ -287,17 +298,16 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsMultiFlush) { storage::RowRecord *record; int max_rows = max_tablet_num * 1; for (int cur_row = 0; cur_row < max_rows; cur_row++) { - record = qds->get_next(); - if (!record) { + if (!qds->next()) { break; } + record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } } - storage::QueryExpression::destory(query_expr); reader.destroy_query_data_set(qds); } @@ -315,23 +325,27 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsInt64) { common::TSEncoding::PLAIN, common::CompressionType::UNCOMPRESSED)); tsfile_writer_->register_timeseries( - device_name, measure_name, common::TSDataType::INT64, - common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); + device_name, storage::MeasurementSchema( + measure_name, common::TSDataType::INT64, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); } } for (int i = 0; i < device_num; i++) { std::string device_name = "test_device" + std::to_string(i); int max_rows = 100; - Tablet tablet(device_name, &schema_vec[i], max_rows); + Tablet tablet( + device_name, + std::make_shared>(schema_vec[i]), + max_rows); tablet.init(); for (int j = 0; j < measurement_num; j++) { for (int row = 0; row < max_rows; row++) { - tablet.set_timestamp(row, 16225600 + row); + tablet.add_timestamp(row, 16225600 + row); } for (int row = 0; row < max_rows; row++) { - tablet.set_value(row, j, static_cast(row)); + tablet.add_value(row, j, static_cast(row)); } } ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); @@ -355,23 +369,27 @@ TEST_F(TsFileWriterTest, WriteMultipleTabletsDouble) { common::TSEncoding::PLAIN, common::CompressionType::UNCOMPRESSED)); tsfile_writer_->register_timeseries( - device_name, measure_name, common::TSDataType::DOUBLE, - common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); + device_name, storage::MeasurementSchema( + measure_name, common::TSDataType::DOUBLE, + common::TSEncoding::PLAIN, + common::CompressionType::UNCOMPRESSED)); } } for (int i = 0; i < device_num; i++) { std::string device_name = "test_device" + std::to_string(i); int max_rows = 200; - Tablet tablet(device_name, &schema_vec[i], max_rows); + Tablet tablet( + device_name, + std::make_shared>(schema_vec[i]), + max_rows); tablet.init(); for (int j = 0; j < measurement_num; j++) { for (int row = 0; row < max_rows; row++) { - tablet.set_timestamp(row, 16225600 + row); + tablet.add_timestamp(row, 16225600 + row); } for (int row = 0; row < max_rows; row++) { - tablet.set_value(row, j, static_cast(row) + 1.0); + tablet.add_value(row, j, static_cast(row) + 1.0); } } ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); @@ -397,22 +415,22 @@ TEST_F(TsFileWriterTest, FlushMultipleDevice) { common::TSEncoding::PLAIN, common::CompressionType::UNCOMPRESSED)); tsfile_writer_->register_timeseries( - device_name, measure_name, common::TSDataType::INT64, + device_name, MeasurementSchema(measure_name, common::TSDataType::INT64, common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); + common::CompressionType::UNCOMPRESSED)); } } for (int i = 0; i < device_num; i++) { std::string device_name = "test_device" + std::to_string(i); - Tablet tablet(device_name, &schema_vec[i], max_rows); + Tablet tablet(device_name, std::make_shared>(schema_vec[i]), max_rows); tablet.init(); for (int j = 0; j < measurement_num; j++) { for (int row = 0; row < max_rows; row++) { - tablet.set_timestamp(row, 16225600 + row); + tablet.add_timestamp(row, 16225600 + row); } for (int row = 0; row < max_rows; row++) { - tablet.set_value(row, j, static_cast(row)); + tablet.add_value(row, j, static_cast(row)); } } ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); @@ -436,7 +454,7 @@ TEST_F(TsFileWriterTest, FlushMultipleDevice) { storage::TsFileReader reader; int ret = reader.open(file_name_); ASSERT_EQ(ret, common::E_OK); - storage::QueryDataSet *tmp_qds = nullptr; + storage::ResultSet *tmp_qds = nullptr; ret = reader.query(query_expr, tmp_qds); auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; @@ -444,7 +462,10 @@ TEST_F(TsFileWriterTest, FlushMultipleDevice) { storage::RowRecord *record; int64_t cur_record_num = 0; do { - record = qds->get_next(); + if (!qds->next()) { + break; + } + record = qds->get_row_record(); // if empty chunk is writen, the timestamp should be NULL if (!record) { break; @@ -453,7 +474,6 @@ TEST_F(TsFileWriterTest, FlushMultipleDevice) { cur_record_num++; } while (true); EXPECT_EQ(cur_record_num, max_rows); - storage::QueryExpression::destory(query_expr); reader.destroy_query_data_set(qds); } @@ -472,22 +492,22 @@ TEST_F(TsFileWriterTest, AnalyzeTsfileForload) { common::TSEncoding::PLAIN, common::CompressionType::UNCOMPRESSED)); tsfile_writer_->register_timeseries( - device_name, measure_name, common::TSDataType::INT64, + device_name, MeasurementSchema(measure_name, common::TSDataType::INT64, common::TSEncoding::PLAIN, - common::CompressionType::UNCOMPRESSED); + common::CompressionType::UNCOMPRESSED)); } } for (int i = 0; i < device_num; i++) { std::string device_name = "test_device" + std::to_string(i); - Tablet tablet(device_name, &schema_vec[i], max_rows); + Tablet tablet(device_name, std::make_shared>(schema_vec[i]), max_rows); tablet.init(); for (int j = 0; j < measurement_num; j++) { for (int row = 0; row < max_rows; row++) { - tablet.set_timestamp(row, 16225600 + row); + tablet.add_timestamp(row, 16225600 + row); } for (int row = 0; row < max_rows; row++) { - tablet.set_value(row, j, static_cast(row)); + tablet.add_value(row, j, static_cast(row)); } } ASSERT_EQ(tsfile_writer_->write_tablet(tablet), E_OK); @@ -511,9 +531,10 @@ TEST_F(TsFileWriterTest, FlushWithoutWriteAfterRegisterTS) { common::CompressionType compression_type = common::CompressionType::UNCOMPRESSED; - ASSERT_EQ(tsfile_writer_->register_timeseries(device_path, measurement_name, - data_type, encoding, - compression_type), + ASSERT_EQ(tsfile_writer_->register_timeseries( + device_path, + storage::MeasurementSchema(measurement_name, data_type, + encoding, compression_type)), E_OK); ASSERT_EQ(tsfile_writer_->flush(), E_OK); ASSERT_EQ(tsfile_writer_->close(), E_OK); @@ -543,8 +564,7 @@ TEST_F(TsFileWriterTest, WriteAlignedTimeseries) { for (int i = 0; i < row_num; ++i) { TsRecord record(1622505600000 + i * 1000, device_name); for (const auto &measurement_name : measurement_names) { - DataPoint point(measurement_name, (int32_t)i); - record.append_data_point(point); + record.add_point(measurement_name, (int32_t)i); } ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); } @@ -564,24 +584,23 @@ TEST_F(TsFileWriterTest, WriteAlignedTimeseries) { storage::TsFileReader reader; int ret = reader.open(file_name_); ASSERT_EQ(ret, common::E_OK); - storage::QueryDataSet *tmp_qds = nullptr; + storage::ResultSet *tmp_qds = nullptr; ret = reader.query(query_expr, tmp_qds); auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; storage::RowRecord *record; for (int cur_row = 0; cur_row < row_num; cur_row++) { - record = qds->get_next(); - if (!record) { + if (!qds->next()) { break; } + record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } } - storage::QueryExpression::destory(query_expr); reader.destroy_query_data_set(qds); } @@ -609,8 +628,7 @@ TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) { for (int i = 0; i < row_num; ++i) { TsRecord record(1622505600000 + i * 1000, device_name); for (const auto &measurement_name : measurement_names) { - DataPoint point(measurement_name, (int32_t)i); - record.append_data_point(point); + record.add_point(measurement_name, (int32_t)i); } ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); ASSERT_EQ(tsfile_writer_->flush(), E_OK); @@ -630,24 +648,23 @@ TEST_F(TsFileWriterTest, WriteAlignedMultiFlush) { storage::TsFileReader reader; int ret = reader.open(file_name_); ASSERT_EQ(ret, common::E_OK); - storage::QueryDataSet *tmp_qds = nullptr; + storage::ResultSet *tmp_qds = nullptr; ret = reader.query(query_expr, tmp_qds); auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; storage::RowRecord *record; for (int cur_row = 0; cur_row < row_num; cur_row++) { - record = qds->get_next(); - if (!record) { + if (!qds->next()) { break; } + record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } } - storage::QueryExpression::destory(query_expr); reader.destroy_query_data_set(qds); } @@ -675,11 +692,7 @@ TEST_F(TsFileWriterTest, WriteAlignedPartialData) { for (int i = 0; i < row_num; ++i) { TsRecord record(1622505600000 + i * 1000, device_name); for (const auto &measurement_name : measurement_names) { - DataPoint point(measurement_name, (int32_t)i); - if (i % 2 == 0) { - point.isnull = true; - } - record.append_data_point(point); + record.add_point(measurement_name, (int32_t)i); } ASSERT_EQ(tsfile_writer_->write_record_aligned(record), E_OK); } @@ -698,25 +711,24 @@ TEST_F(TsFileWriterTest, WriteAlignedPartialData) { storage::TsFileReader reader; int ret = reader.open(file_name_); ASSERT_EQ(ret, common::E_OK); - storage::QueryDataSet *tmp_qds = nullptr; + storage::ResultSet *tmp_qds = nullptr; ret = reader.query(query_expr, tmp_qds); auto *qds = (QDSWithoutTimeGenerator *)tmp_qds; storage::RowRecord *record; - int64_t cur_row = 1; + int64_t cur_row = 0; do { - record = qds->get_next(); - if (!record) { + if (!qds->next()) { break; } + record = qds->get_row_record(); int size = record->get_fields()->size(); for (int i = 0; i < size; ++i) { EXPECT_EQ(std::to_string(cur_row), field_to_string(record->get_field(i))); } - cur_row += 2; + cur_row++; } while (true); - storage::QueryExpression::destory(query_expr); reader.destroy_query_data_set(qds); } \ No newline at end of file