Skip to content

Commit

Permalink
Enable address sanitizer in tsfile cpp. (#214)
Browse files Browse the repository at this point in the history
* Enable asan in tsfile cpp.

* Cpp.Fix memory leak.

* cpp.fix MeasurementSchemaGroup test error.

* rename table_size to tablet_size at example.
  • Loading branch information
ColinLeeo authored Sep 6, 2024
1 parent afba9f3 commit cd418aa
Show file tree
Hide file tree
Showing 48 changed files with 320 additions and 176 deletions.
1 change: 1 addition & 0 deletions cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ add_subdirectory(test)
if(TESTS_ENABLED)
add_dependencies(TsFile_Test tsfile)
endif()

1 change: 0 additions & 1 deletion cpp/bench_mark/bench_mark_src/bench_conf.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,3 @@ int THREAD_NUM = 1;
int TIMESERIES_NUM = 50;
std::vector<int> TYPE_LIST = {0, 0, 1, 0, 1};
} // namespace bench

2 changes: 0 additions & 2 deletions cpp/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use_cpp11=1
enable_cov=0
debug_se=0
run_cov_only=0
env_for_cyber=0

shell_dir=$(cd "$(dirname "$0")";pwd)

Expand Down Expand Up @@ -127,7 +126,6 @@ cmake ../../ \
-DCMAKE_BUILD_TYPE=$build_type \
-DUSE_CPP11=$use_cpp11 \
-DENABLE_COV=$enable_cov \
-DENABLE_ASAN=$enable_asan \
-DDEBUG_SE=$debug_se \
-DBUILD_TSFILE_ONLY=$build_tsfile_only

Expand Down
1 change: 0 additions & 1 deletion cpp/examples/c_examples/c_examples.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,3 @@ ErrorCode read_tsfile();
#ifdef __cplusplus
}
#endif

1 change: 1 addition & 0 deletions cpp/examples/cpp_examples/cpp_examples.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "common/path.h"
#include "common/record.h"
#include "common/row_record.h"
#include "common/schema.h"
#include "reader/expression.h"
#include "reader/filter/filter.h"
#include "reader/qds_with_timegenerator.h"
Expand Down
112 changes: 85 additions & 27 deletions cpp/examples/cpp_examples/demo_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,97 @@
#include <time.h>

#include <iostream>
#include <random>
#include <string>

#include "cpp_examples.h"

using namespace storage;

long getNowTime() { return time(nullptr); }

static std::string generate_random_string(int length) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> dis(0, 61);

const std::string chars =
"0123456789"
"abcdefghijklmnopqrstuvwxyz"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ";

std::string random_string;

for (int i = 0; i < length; ++i) {
random_string += chars[dis(gen)];
}

return random_string;
}

int demo_write() {
storage::TsFileWriter tsfile_writer;
std::string device_name = "root.db001.dev001";
std::string measurement_name = "m001";
storage::libtsfile_init();
int ret = tsfile_writer.open("cpp_rw.tsfile", O_CREAT | O_RDWR, 0644);
ASSERT(ret == 0);
ret = tsfile_writer.register_timeseries(device_name, measurement_name,
common::INT32, common::PLAIN,
common::UNCOMPRESSED);
ASSERT(ret == 0);
std::cout << "get open ret: " << ret << std::endl;

int row_count = 100;
for (int i = 1; i < row_count; ++i) {
storage::DataPoint point(measurement_name, 10000 + i);
storage::TsRecord record(i, device_name, 1);
record.points_.push_back(point);
ret = tsfile_writer.write_record(record);
ASSERT(ret == 0);
TsFileWriter* tsfile_writer_ = new TsFileWriter();
libtsfile_init();
std::string file_name_ = std::string("tsfile_writer_test_") +
generate_random_string(10) +
std::string(".tsfile");
int flags = O_WRONLY | O_CREAT | O_TRUNC;
#ifdef _WIN32
flags |= O_BINARY;
#endif
mode_t mode = 0666;
tsfile_writer_->open(file_name_, flags, mode);
remove(file_name_.c_str());
const int device_num = 50;
const int measurement_num = 50;
std::vector<MeasurementSchema> schema_vec[50];
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
for (int j = 0; j < measurement_num; j++) {
std::string measure_name = "measurement" + std::to_string(j);
schema_vec[i].push_back(
MeasurementSchema(measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED));
tsfile_writer_->register_timeseries(
device_name, measure_name, common::TSDataType::INT32,
common::TSEncoding::PLAIN,
common::CompressionType::UNCOMPRESSED);
}
}

std::cout << "input tablet size" << std::endl;
int tablet_size;
std::cin >> tablet_size;

int max_rows = 100000;
int cur_row = 0;
long start = getNowTime();
for (; cur_row < max_rows;) {
if (cur_row + tablet_size > max_rows) {
tablet_size = max_rows - cur_row;
}
for (int i = 0; i < device_num; i++) {
std::string device_name = "test_device" + std::to_string(i);
Tablet tablet(device_name, &schema_vec[i], tablet_size);
tablet.init();
for (int row = 0; row < tablet_size; row++) {
tablet.set_timestamp(row, 16225600 + cur_row + row);
}
for (int j = 0; j < measurement_num; j++) {
for (int row = 0; row < tablet_size; row++) {
tablet.set_value(row, j, row + cur_row);
}
}
tsfile_writer_->write_tablet(tablet);
tsfile_writer_->flush();
}
cur_row += tablet_size;
std::cout << "finish writing " << cur_row << " rows" << std::endl;
}

tsfile_writer.flush();
std::cout << "finish flush" << std::endl;
tsfile_writer.close();
std::cout << "tsfile closed." << std::endl;
storage::libtsfile_destroy();
std::cout << "tsfile to destory." << std::endl;
std::cout << "finish writing" << std::endl;
std::cout << "will close our files" << std::endl;
tsfile_writer_->close();
long end = getNowTime();
printf("interval waitForResults is %ld \n", end - start);
return 0;
}
2 changes: 1 addition & 1 deletion cpp/src/common/allocator/byte_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,7 +671,7 @@ FORCE_INLINE int merge_byte_stream(ByteStream &sea, ByteStream &river,
}

FORCE_INLINE int copy_bs_to_buf(ByteStream &bs, char *src_buf,
uint32_t src_buf_len) {
uint32_t src_buf_len) {
ByteStream::BufferIterator buf_iter = bs.init_buffer_iterator();
uint32_t copyed_len = 0;
while (true) {
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/common/allocator/my_string.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,11 @@ struct String {
memcpy(buf_, str.c_str(), len_);
return common::E_OK;
}

FORCE_INLINE bool operator==(const String &other) const {
return equal_to(other);
}

FORCE_INLINE int dup_from(const String &str, common::PageArena &pa) {
len_ = str.len_;
if (UNLIKELY(len_ == 0)) {
Expand Down Expand Up @@ -143,10 +148,6 @@ struct String {
return this->len_ < other.len_;
}

bool operator==(const String &that) const {
return equal_to(that);
}

#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os, const String &s) {
os << s.len_ << "@";
Expand Down
7 changes: 0 additions & 7 deletions cpp/src/common/config/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,13 @@ typedef struct ConfigValue {
uint32_t
tsblock_mem_inc_step_size_; // tsblock memory self-increment step size
uint32_t tsblock_max_memory_; // the maximum memory of a single tsblock
const char *rest_service_ip_;
int32_t rest_service_port_;
WALFlushPolicy wal_flush_policy_;
uint32_t seqtvlist_primary_array_size_;
uint32_t seqtvlist_max_record_count_;
uint32_t page_writer_max_point_num_;
uint32_t page_writer_max_memory_bytes_;
uint32_t max_degree_of_index_node_;
double tsfile_index_bloom_filter_error_percent_;
const char *tsfile_prefix_path_;
TSEncoding time_encoding_type_;
TSDataType time_data_type_;
CompressionType time_compress_type_;
uint32_t memtable_flusher_poll_interval_seconds_;
int32_t chunk_group_size_threshold_;
int32_t record_count_for_next_mem_check_;
} ConfigValue;
Expand Down
33 changes: 29 additions & 4 deletions cpp/src/common/datatype/value.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,41 @@
namespace common {

struct Value {
Value(TSDataType type) : type_(type), value_{0} {}
Value(TSDataType type) : type_(type) {
switch (type) {
case BOOLEAN:
value_.bval_ = false;
break;
case INT32:
value_.ival_ = 0;
break;
case INT64:
value_.lval_ = 0;
break;
case FLOAT:
value_.fval_ = 0.0f;
break;
case DOUBLE:
value_.dval_ = 0.0;
break;
case TEXT:
value_.sval_ = nullptr;
break;
case NULL_TYPE:
break;
default:
LOGE("unknown data type");
}
}

~Value() {
if (is_type(NULL_TYPE) && value_.sval_) {
if (is_type(TEXT) && value_.sval_) {
free(value_.sval_);
}
}

FORCE_INLINE void free_memory() {
if (is_type(NULL_TYPE) && value_.sval_) {
if (is_type(TEXT) && value_.sval_) {
free(value_.sval_);
value_.sval_ = nullptr;
}
Expand Down Expand Up @@ -121,7 +146,7 @@ FORCE_INLINE Value *make_literal(double val) {

FORCE_INLINE Value *make_literal(char *string) {
Value *value = new Value(TEXT);
value->value_.sval_ = string;
value->value_.sval_ = strdup(string);
return value;
}

Expand Down
16 changes: 0 additions & 16 deletions cpp/src/common/global.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,35 +31,19 @@ namespace common {
ColumnDesc g_time_column_desc;
ConfigValue g_config_value_;

// TODO move to server.cc ?
void init_config_value() {
g_config_value_.tsblock_mem_inc_step_size_ = 8000; // 8k
g_config_value_.tsblock_max_memory_ = 64000; // 64k
// g_config_value_.tsblock_max_memory_ = 32;
g_config_value_.rest_service_ip_ = "0.0.0.0";

char* timechodb_port = getenv("TIMECHODB_PORT");
if (nullptr == timechodb_port) {
g_config_value_.rest_service_port_ = 8899;
} else {
g_config_value_.rest_service_port_ = atoi(timechodb_port);
}

g_config_value_.wal_flush_policy_ = WAL_DISABLED;
g_config_value_.seqtvlist_primary_array_size_ = 32; // 32;
g_config_value_.seqtvlist_max_record_count_ = 1024; // 64;
g_config_value_.page_writer_max_point_num_ = 5;
g_config_value_.page_writer_max_memory_bytes_ = 128 * 1024; // 128 k
g_config_value_.max_degree_of_index_node_ = 256;
g_config_value_.tsfile_index_bloom_filter_error_percent_ = 0.05;
g_config_value_.record_count_for_next_mem_check_ = 100;
g_config_value_.chunk_group_size_threshold_ = 128 * 1024 * 1024;
// g_config_value_.tsfile_prefix_path_ = "./data";
g_config_value_.tsfile_prefix_path_ = "";
g_config_value_.time_encoding_type_ = TS_2DIFF;
g_config_value_.time_data_type_ = INT64;
g_config_value_.time_compress_type_ = LZ4;
g_config_value_.memtable_flusher_poll_interval_seconds_ = 1;
}

void config_set_page_max_point_count(uint32_t page_max_ponint_count) {
Expand Down
8 changes: 0 additions & 8 deletions cpp/src/common/global.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,6 @@
namespace common {

extern ConfigValue g_config_value_;

FORCE_INLINE bool wal_cfg_enabled() {
return g_config_value_.wal_flush_policy_ != WAL_DISABLED;
}
FORCE_INLINE bool wal_cfg_should_wait_persisted() {
return g_config_value_.wal_flush_policy_ >= WAL_FLUSH;
}

extern ColumnDesc g_time_column_desc;
extern int init_common();
extern bool is_timestamp_column_name(const char *time_col_name);
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/common/schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ typedef std::pair<MeasurementSchemaMapIter, bool>
struct MeasurementSchemaGroup {
// measurement_name -> MeasurementSchema
MeasurementSchemaMap measurement_schema_map_;
bool is_aligned_;
bool is_aligned_ = false;
TimeChunkWriter *time_chunk_writer_ = nullptr;
};

Expand Down
2 changes: 1 addition & 1 deletion cpp/src/common/statistic.h
Original file line number Diff line number Diff line change
Expand Up @@ -916,7 +916,7 @@ class StatisticFactory {
ASSERT(false);
break;
case common::VECTOR:
ALLOC_STATISTIC(TimeStatistic);
ALLOC_STATISTIC_WITH_PA(TimeStatistic);
break;
default:
ASSERT(false);
Expand Down
Loading

0 comments on commit cd418aa

Please sign in to comment.