From af589c0b13b0af8c3ea8d17753b820f9ee20aba2 Mon Sep 17 00:00:00 2001 From: yiguolei <676222867@qq.com> Date: Mon, 22 Apr 2024 00:10:03 +0800 Subject: [PATCH] [memtracker](accuracy) should not account resuable buffer to query memtracker (#33933) Co-authored-by: yiguolei --- be/src/util/block_compression.cpp | 118 ++++++++++++++++-------------- 1 file changed, 65 insertions(+), 53 deletions(-) diff --git a/be/src/util/block_compression.cpp b/be/src/util/block_compression.cpp index f3b1e781e7ea10..21f7e72f5d935f 100644 --- a/be/src/util/block_compression.cpp +++ b/be/src/util/block_compression.cpp @@ -96,13 +96,20 @@ class Lz4BlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) {} + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4_stream_t* ctx; - faststring buffer; + std::unique_ptr buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStream(ctx); } + buffer.reset(); } }; @@ -118,8 +125,6 @@ class Lz4BlockCompression : public BlockCompressionCodec { } Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); if (input.size > INT_MAX) { return Status::InvalidArgument( "LZ4 not support those case(input.size>INT_MAX), maybe you should change " @@ -144,8 +149,14 @@ class Lz4BlockCompression : public BlockCompressionCodec { compressed_buf.size = max_len; } else { // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer.resize(max_len); - compressed_buf.data = reinterpret_cast(context->buffer.data()); + { + // context->buffer is resuable between queries, should accouting to + // global tracker. + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -165,8 +176,6 @@ class Lz4BlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -218,8 +227,6 @@ class HadoopLz4BlockCompression : public Lz4BlockCompression { return &s_instance; } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); RETURN_IF_ERROR(Decompressor::create_decompressor(CompressType::LZ4BLOCK, &_decompressor)); size_t input_bytes_read = 0; size_t decompressed_len = 0; @@ -245,13 +252,20 @@ class Lz4fBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) {} + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4F_compressionContext_t ctx; - faststring buffer; + std::unique_ptr buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4F_freeCompressionContext(ctx); } + buffer.reset(); } }; class DContext { @@ -301,8 +315,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec { private: Status _compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -319,9 +331,13 @@ class Lz4fBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer.resize(max_len); - compressed_buf.data = reinterpret_cast(context->buffer.data()); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -361,8 +377,6 @@ class Lz4fBlockCompression : public BlockCompressionCodec { } Status _decompress(const Slice& input, Slice* output) { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); bool decompress_failed = false; std::unique_ptr context; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -472,13 +486,20 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(Context); public: - Context() : ctx(nullptr) {} + Context() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } LZ4_streamHC_t* ctx; - faststring buffer; + std::unique_ptr buffer; ~Context() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { LZ4_freeStreamHC(ctx); } + buffer.reset(); } }; @@ -494,8 +515,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { } Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -512,9 +531,13 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer.resize(max_len); - compressed_buf.data = reinterpret_cast(context->buffer.data()); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -533,8 +556,6 @@ class Lz4HCBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); auto decompressed_len = LZ4_decompress_safe(input.data, output->data, input.size, output->size); if (decompressed_len < 0) { @@ -654,8 +675,6 @@ class SnappyBlockCompression : public BlockCompressionCodec { ~SnappyBlockCompression() override {} Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(input.size); output->resize(max_len); Slice s(*output); @@ -666,8 +685,6 @@ class SnappyBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); if (!snappy::RawUncompress(input.data, input.size, output->data)) { return Status::InvalidArgument("Fail to do Snappy decompress"); } @@ -699,8 +716,6 @@ class ZlibBlockCompression : public BlockCompressionCodec { ~ZlibBlockCompression() {} Status compress(const Slice& input, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(input.size); output->resize(max_len); Slice s(*output); @@ -715,8 +730,6 @@ class ZlibBlockCompression : public BlockCompressionCodec { Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t max_len = max_compressed_len(uncompressed_size); output->resize(max_len); @@ -757,8 +770,6 @@ class ZlibBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); size_t input_size = input.size; auto zres = ::uncompress2((Bytef*)output->data, &output->size, (Bytef*)input.data, &input_size); @@ -781,13 +792,20 @@ class ZstdBlockCompression : public BlockCompressionCodec { ENABLE_FACTORY_CREATOR(CContext); public: - CContext() : ctx(nullptr) {} + CContext() : ctx(nullptr) { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + buffer = std::make_unique(); + } ZSTD_CCtx* ctx; - faststring buffer; + std::unique_ptr buffer; ~CContext() { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); if (ctx) { ZSTD_freeCCtx(ctx); } + buffer.reset(); } }; class DContext { @@ -826,8 +844,6 @@ class ZstdBlockCompression : public BlockCompressionCodec { // https://github.com/facebook/zstd/blob/dev/examples/streaming_compression.c Status compress(const std::vector& inputs, size_t uncompressed_size, faststring* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; RETURN_IF_ERROR(_acquire_compression_ctx(context)); bool compress_failed = false; @@ -845,9 +861,13 @@ class ZstdBlockCompression : public BlockCompressionCodec { compressed_buf.data = reinterpret_cast(output->data()); compressed_buf.size = max_len; } else { - // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE - context->buffer.resize(max_len); - compressed_buf.data = reinterpret_cast(context->buffer.data()); + { + SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( + ExecEnv::GetInstance()->block_compression_mem_tracker()); + // reuse context buffer if max_len <= MAX_COMPRESSION_BUFFER_FOR_REUSE + context->buffer->resize(max_len); + } + compressed_buf.data = reinterpret_cast(context->buffer->data()); compressed_buf.size = max_len; } @@ -904,8 +924,6 @@ class ZstdBlockCompression : public BlockCompressionCodec { } Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); std::unique_ptr context; bool decompress_failed = false; RETURN_IF_ERROR(_acquire_decompression_ctx(context)); @@ -1001,8 +1019,6 @@ class GzipBlockCompression : public ZlibBlockCompression { ~GzipBlockCompression() override = default; Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); z_stream z_strm = {}; z_strm.zalloc = Z_NULL; z_strm.zfree = Z_NULL; @@ -1084,8 +1100,6 @@ class GzipBlockCompressionByLibdeflate final : public GzipBlockCompression { ~GzipBlockCompressionByLibdeflate() override = default; Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); if (input.empty()) { output->size = 0; return Status::OK(); @@ -1118,8 +1132,6 @@ class LzoBlockCompression final : public BlockCompressionCodec { } size_t max_compressed_len(size_t len) override { return 0; }; Status decompress(const Slice& input, Slice* output) override { - SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER( - ExecEnv::GetInstance()->block_compression_mem_tracker()); auto* input_ptr = input.data; auto remain_input_size = input.size; auto* output_ptr = output->data;