Skip to content

Commit

Permalink
[chore](check) open shorten-64-to-32 error (apache#41197)
Browse files Browse the repository at this point in the history
```
doris/be/src/pipeline/pipeline_fragment_context.cpp:1160:76: error: implicit conversion loses integer precision: 'size_type' (aka 'unsigned long') to 'int' [-Werror,-Wshorten-64-to-32]
 1159 |         _sink.reset(new MultiCastDataStreamSinkOperatorX(
      |                         ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 1160 |                 sink_id, sources, thrift_sink.multi_cast_stream_sink.sinks.size(), pool,
      |                                   ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^~~~~~
```
  • Loading branch information
Mryange authored Sep 29, 2024
1 parent cfe1506 commit 9bd671f
Show file tree
Hide file tree
Showing 17 changed files with 172 additions and 45 deletions.
73 changes: 73 additions & 0 deletions be/src/common/cast_set.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <limits>
#include <type_traits>

#include "common/exception.h"
#include "common/status.h"

namespace doris {

template <typename T, typename U>
void check_cast_value(U b) {
if constexpr (std::is_unsigned_v<U>) {
if (b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else if constexpr (std::is_unsigned_v<T>) {
if (b < 0 || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
} else {
if (b < std::numeric_limits<T>::min() || b > std::numeric_limits<T>::max()) {
throw doris::Exception(ErrorCode::INVALID_ARGUMENT,
"value {} cast to type {} out of range [{},{}]", b,
typeid(T).name(), std::numeric_limits<T>::min(),
std::numeric_limits<T>::max());
}
}
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
void cast_set(T& a, U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
a = static_cast<T>(b);
}

template <typename T, typename U, bool need_check_value = true>
requires std::is_integral_v<T> && std::is_integral_v<U>
T cast_set(U b) {
if constexpr (need_check_value) {
check_cast_value<T>(b);
}
return static_cast<T>(b);
}

} // namespace doris
24 changes: 24 additions & 0 deletions be/src/common/compile_check_begin.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic error "-Wshorten-64-to-32"
#endif
//#include "common/compile_check_begin.h"
23 changes: 23 additions & 0 deletions be/src/common/compile_check_end.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#ifdef __clang__
#pragma clang diagnostic pop
#endif
// #include "common/compile_check_end.h"
18 changes: 9 additions & 9 deletions be/src/pipeline/exec/aggregation_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
#include "vec/exprs/vectorized_agg_fn.h"

namespace doris::pipeline {

#include "common/compile_check_begin.h"
/// The minimum reduction factor (input rows divided by output rows) to grow hash tables
/// in a streaming preaggregation, given that the hash tables are currently the given
/// size or above. The sizes roughly correspond to hash table sizes where the bucket
Expand Down Expand Up @@ -266,7 +266,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
vectorized::ColumnRawPtrs key_columns(key_size);
std::vector<int> key_locs(key_size);

for (size_t i = 0; i < key_size; ++i) {
for (int i = 0; i < key_size; ++i) {
if constexpr (for_spill) {
key_columns[i] = block->get_by_position(i).column.get();
key_locs[i] = i;
Expand All @@ -279,7 +279,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
}
}

int rows = block->rows();
size_t rows = block->rows();
if (_places.size() < rows) {
_places.resize(rows);
}
Expand Down Expand Up @@ -336,7 +336,7 @@ Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* b
if (need_do_agg) {
for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
if (Base::_shared_state->aggregate_evaluators[i]->is_merge() || for_spill) {
int col_id = 0;
size_t col_id = 0;
if constexpr (for_spill) {
col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
} else {
Expand Down Expand Up @@ -459,7 +459,7 @@ Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block*
}
}

int rows = block->rows();
size_t rows = block->rows();
if (_places.size() < rows) {
_places.resize(rows);
}
Expand Down Expand Up @@ -756,7 +756,7 @@ Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) {
const auto& agg_sort_info = tnode.agg_node.agg_sort_info_by_group_key;
DCHECK_EQ(agg_sort_info.nulls_first.size(), agg_sort_info.is_asc_order.size());

const int order_by_key_size = agg_sort_info.is_asc_order.size();
const size_t order_by_key_size = agg_sort_info.is_asc_order.size();
_order_directions.resize(order_by_key_size);
_null_directions.resize(order_by_key_size);
for (int i = 0; i < order_by_key_size; ++i) {
Expand All @@ -777,16 +777,16 @@ Status AggSinkOperatorX::open(RuntimeState* state) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(
_probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc()));

int j = _probe_expr_ctxs.size();
for (int i = 0; i < j; ++i) {
size_t j = _probe_expr_ctxs.size();
for (size_t i = 0; i < j; ++i) {
auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable();
auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable();
if (nullable_output != nullable_input) {
DCHECK(nullable_output);
_make_nullable_keys.emplace_back(i);
}
}
for (int i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
for (size_t i = 0; i < _aggregate_evaluators.size(); ++i, ++j) {
SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j];
SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j];
RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare(
Expand Down
18 changes: 9 additions & 9 deletions be/src/pipeline/exec/aggregation_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "vec/exprs/vectorized_agg_fn.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"

AggLocalState::AggLocalState(RuntimeState* state, OperatorXBase* parent)
: Base(state, parent),
Expand Down Expand Up @@ -106,8 +107,8 @@ Status AggLocalState::_serialize_with_serialized_key_result(RuntimeState* state,
vectorized::Block* block, bool* eos) {
SCOPED_TIMER(_serialize_result_timer);
auto& shared_state = *_shared_state;
int key_size = _shared_state->probe_expr_ctxs.size();
int agg_size = _shared_state->aggregate_evaluators.size();
size_t key_size = _shared_state->probe_expr_ctxs.size();
size_t agg_size = _shared_state->aggregate_evaluators.size();
vectorized::MutableColumns value_columns(agg_size);
vectorized::DataTypes value_data_types(agg_size);

Expand Down Expand Up @@ -229,7 +230,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, vecto

auto columns_with_schema = vectorized::VectorizedUtils::create_columns_with_type_and_name(
_parent->cast<AggSourceOperatorX>()._row_descriptor);
int key_size = shared_state.probe_expr_ctxs.size();
size_t key_size = shared_state.probe_expr_ctxs.size();

vectorized::MutableColumns key_columns;
for (int i = 0; i < key_size; ++i) {
Expand All @@ -240,7 +241,7 @@ Status AggLocalState::_get_with_serialized_key_result(RuntimeState* state, vecto
}
}
vectorized::MutableColumns value_columns;
for (int i = key_size; i < columns_with_schema.size(); ++i) {
for (size_t i = key_size; i < columns_with_schema.size(); ++i) {
if (!mem_reuse) {
value_columns.emplace_back(columns_with_schema[i].type->create_column());
} else {
Expand Down Expand Up @@ -346,7 +347,7 @@ Status AggLocalState::_serialize_without_key(RuntimeState* state, vectorized::Bl
block->clear();

DCHECK(shared_state.agg_data->without_key != nullptr);
int agg_size = shared_state.aggregate_evaluators.size();
size_t agg_size = shared_state.aggregate_evaluators.size();

vectorized::MutableColumns value_columns(agg_size);
std::vector<vectorized::DataTypePtr> data_types(agg_size);
Expand Down Expand Up @@ -385,7 +386,7 @@ Status AggLocalState::_get_without_key_result(RuntimeState* state, vectorized::B

auto& p = _parent->cast<AggSourceOperatorX>();
*block = vectorized::VectorizedUtils::create_empty_columnswithtypename(p._row_descriptor);
int agg_size = shared_state.aggregate_evaluators.size();
size_t agg_size = shared_state.aggregate_evaluators.size();

vectorized::MutableColumns columns(agg_size);
std::vector<vectorized::DataTypePtr> data_types(agg_size);
Expand Down Expand Up @@ -495,7 +496,7 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
key_columns[i] = block->get_by_position(i).column.get();
}

int rows = block->rows();
size_t rows = block->rows();
if (_places.size() < rows) {
_places.resize(rows);
}
Expand Down Expand Up @@ -539,8 +540,7 @@ Status AggLocalState::merge_with_serialized_key_helper(vectorized::Block* block)
_emplace_into_hash_table(_places.data(), key_columns, rows);

for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) {
int col_id = 0;
col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
auto col_id = Base::_shared_state->probe_expr_ctxs.size() + i;
auto column = block->get_by_position(col_id).column;
if (column->is_nullable()) {
column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr();
Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/analytic_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "vec/exprs/vectorized_agg_fn.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"

Status AnalyticSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(PipelineXSinkLocalState<AnalyticSharedState>::init(state, info));
Expand Down Expand Up @@ -91,7 +92,7 @@ bool AnalyticSinkLocalState::_whether_need_next_partition(BlockRowPos& found_par
}

//_partition_by_columns,_order_by_columns save in blocks, so if need to calculate the boundary, may find in which blocks firstly
BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(int idx, BlockRowPos start,
BlockRowPos AnalyticSinkLocalState::_compare_row_to_find_end(int64_t idx, BlockRowPos start,
BlockRowPos end,
bool need_check_first) {
auto& shared_state = *_shared_state;
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class AnalyticSinkLocalState : public PipelineXSinkLocalState<AnalyticSharedStat
return need_more_input;
}
BlockRowPos _get_partition_by_end();
BlockRowPos _compare_row_to_find_end(int idx, BlockRowPos start, BlockRowPos end,
BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, BlockRowPos end,
bool need_check_first = false);
bool _whether_need_next_partition(BlockRowPos& found_partition_end);

Expand Down
7 changes: 4 additions & 3 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "vec/exprs/vectorized_agg_fn.h"

namespace doris::pipeline {
#include "common/compile_check_begin.h"

AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<AnalyticSharedState>(state, parent),
Expand All @@ -38,7 +39,7 @@ AnalyticLocalState::AnalyticLocalState(RuntimeState* state, OperatorXBase* paren
_agg_arena_pool(std::make_unique<vectorized::Arena>()) {}

//_partition_by_columns,_order_by_columns save in blocks, so if need to calculate the boundary, may find in which blocks firstly
BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int idx, BlockRowPos start,
BlockRowPos AnalyticLocalState::_compare_row_to_find_end(int64_t idx, BlockRowPos start,
BlockRowPos end, bool need_check_first) {
auto& shared_state = *_shared_state;
int64_t start_init_row_num = start.row_num;
Expand Down Expand Up @@ -320,8 +321,8 @@ void AnalyticLocalState::_insert_result_info(int64_t current_block_rows) {

const auto& offsets_of_aggregate_states =
_parent->cast<AnalyticSourceOperatorX>()._offsets_of_aggregate_states;
for (int i = 0; i < _agg_functions_size; ++i) {
for (int j = get_result_start; j < _window_end_position; ++j) {
for (size_t i = 0; i < _agg_functions_size; ++i) {
for (size_t j = get_result_start; j < _window_end_position; ++j) {
if (!_agg_functions[i]->function()->get_return_type()->is_nullable() &&
_result_window_columns[i]->is_nullable()) {
if (_current_window_empty) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/analytic_source_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class AnalyticLocalState final : public PipelineXLocalState<AnalyticSharedState>
return need_more_input;
}
BlockRowPos _get_partition_by_end();
BlockRowPos _compare_row_to_find_end(int idx, BlockRowPos start, BlockRowPos end,
BlockRowPos _compare_row_to_find_end(int64_t idx, BlockRowPos start, BlockRowPos end,
bool need_check_first = false);
bool _whether_need_next_partition(BlockRowPos& found_partition_end);

Expand Down
3 changes: 2 additions & 1 deletion be/src/pipeline/exec/scan_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@

namespace doris::pipeline {

#include "common/compile_check_begin.h"
const static int32_t ADAPTIVE_PIPELINE_TASK_SERIAL_READ_ON_LIMIT_DEFAULT = 10000;

#define RETURN_IF_PUSH_DOWN(stmt, status) \
Expand Down Expand Up @@ -682,7 +683,7 @@ Status ScanLocalState<Derived>::_should_push_down_binary_predicate(
DCHECK(constant_val->data == nullptr) << "constant_val should not have a value";
const auto& children = fn_call->children();
DCHECK(children.size() == 2);
for (size_t i = 0; i < children.size(); i++) {
for (int i = 0; i < 2; i++) {
if (vectorized::VExpr::expr_without_cast(children[i])->node_type() !=
TExprNodeType::SLOT_REF) {
// not a slot ref(column)
Expand Down
Loading

0 comments on commit 9bd671f

Please sign in to comment.