Skip to content

Commit

Permalink
Merge branch 'master' into 20240920_fix_memory
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz authored Sep 27, 2024
2 parents 8d3b495 + d6f2c2c commit cb1aabf
Show file tree
Hide file tree
Showing 723 changed files with 89,250 additions and 1,336 deletions.
4 changes: 4 additions & 0 deletions be/src/olap/memtable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,10 @@ Status MemTable::_to_block(std::unique_ptr<vectorized::Block>* res) {
}
if (_keys_type == KeysType::UNIQUE_KEYS && _enable_unique_key_mow &&
!_tablet_schema->cluster_key_idxes().empty()) {
if (_is_partial_update) {
return Status::InternalError(
"Partial update for mow with cluster keys is not supported");
}
RETURN_IF_ERROR(_sort_by_cluster_keys());
}
g_memtable_input_block_allocated_size << -_input_mutable_block.allocated_bytes();
Expand Down
35 changes: 21 additions & 14 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,13 +132,17 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
PUniqueId id;
id.set_hi(_state->query_id().hi);
id.set_lo(_state->query_id().lo);
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);

register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
if (!only_local_exchange) {
_sink_buffer = std::make_unique<ExchangeSinkBuffer>(id, p._dest_node_id, _sender_id,
_state->be_number(), state, this);
register_channels(_sink_buffer.get());
_queue_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"ExchangeSinkQueueDependency", true);
_sink_buffer->set_dependency(_queue_dependency, _finish_dependency);
_finish_dependency->block();
}

if ((_part_type == TPartitionType::UNPARTITIONED || channels.size() == 1) &&
!only_local_exchange) {
_broadcast_dependency = Dependency::create_shared(
Expand Down Expand Up @@ -244,7 +248,6 @@ Status ExchangeSinkLocalState::open(RuntimeState* state) {
fmt::format("Crc32HashPartitioner({})", _partition_count));
}

_finish_dependency->block();
if (_part_type == TPartitionType::HASH_PARTITIONED ||
_part_type == TPartitionType::BUCKET_SHFFULE_HASH_PARTITIONED ||
_part_type == TPartitionType::TABLE_SINK_HASH_PARTITIONED) {
Expand Down Expand Up @@ -559,8 +562,9 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block
final_st = st;
}
}
local_state._sink_buffer->set_should_stop();
return final_st;
if (local_state._sink_buffer) {
local_state._sink_buffer->set_should_stop();
}
}
return final_st;
}
Expand Down Expand Up @@ -631,11 +635,14 @@ Status ExchangeSinkOperatorX::channel_add_rows_with_idx(
std::string ExchangeSinkLocalState::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", Base::debug_string(indentation_level));
fmt::format_to(debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
if (_sink_buffer) {
fmt::format_to(
debug_string_buffer,
", Sink Buffer: (_should_stop = {}, _busy_channels = {}, _is_finishing = {}), "
"_reach_limit: {}",
_sink_buffer->_should_stop.load(), _sink_buffer->_busy_channels.load(),
_sink_buffer->_is_finishing.load(), _reach_limit.load());
}
return fmt::to_string(debug_string_buffer);
}

Expand Down
4 changes: 3 additions & 1 deletion be/src/pipeline/exec/exchange_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,9 @@ class ExchangeSinkLocalState final : public PipelineXSinkLocalState<> {

std::vector<Dependency*> dependencies() const override {
std::vector<Dependency*> dep_vec;
dep_vec.push_back(_queue_dependency.get());
if (_queue_dependency) {
dep_vec.push_back(_queue_dependency.get());
}
if (_broadcast_dependency) {
dep_vec.push_back(_broadcast_dependency.get());
}
Expand Down
8 changes: 5 additions & 3 deletions be/src/pipeline/exec/partition_sort_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -298,14 +298,16 @@ Status PartitionSortSinkOperatorX::_emplace_into_hash_table(
SCOPED_TIMER(local_state._selector_block_timer);
RETURN_IF_ERROR(place->append_block_by_selector(input_block, eos));
}
if (local_state._is_need_passthrough) {
//Perform passthrough for the range [0, row] of input_block
if (local_state._is_need_passthrough && row >= 0) {
{
COUNTER_UPDATE(local_state._passthrough_rows_counter,
(int64_t)(num_rows - row));
(int64_t)(row + 1));
std::lock_guard<std::mutex> lock(
local_state._shared_state->buffer_mutex);
// have emplace (num_rows - row) to hashtable, and now have row remaining needed in block;
input_block->set_num_rows(row);
// set_num_rows(x) retains the range [0, x - 1], so row + 1 is needed here.
input_block->set_num_rows(row + 1);
local_state._shared_state->blocks_buffer.push(
std::move(*input_block));
// buffer have data, source could read this.
Expand Down
8 changes: 7 additions & 1 deletion be/src/util/counts.h
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,12 @@ class Counts {
}
}

void increment(Ty key) { _nums.push_back(key); }

void increment_batch(const vectorized::PaddedPODArray<Ty>& keys) {
_nums.insert(keys.begin(), keys.end());
}

void serialize(vectorized::BufferWritable& buf) {
if (!_nums.empty()) {
pdqsort(_nums.begin(), _nums.end());
Expand Down Expand Up @@ -234,7 +240,7 @@ class Counts {
int array_index;
int64_t element_index;

std::strong_ordering operator<=>(const Node& other) const { return value <=> other.value; }
auto operator<=>(const Node& other) const { return value <=> other.value; }
};

void _convert_sorted_num_vec_to_nums() {
Expand Down
Loading

0 comments on commit cb1aabf

Please sign in to comment.