Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Dec 11, 2024
1 parent 01af7ae commit d79699d
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 32 deletions.
14 changes: 7 additions & 7 deletions be/src/pipeline/exec/analytic_source_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -352,17 +352,17 @@ Status AnalyticLocalState::_get_next_for_rows(size_t current_block_rows) {
int64_t range_start, range_end;
if (!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start &&
_parent->cast<AnalyticSourceOperatorX>()._window.window_end.type ==
TAnalyticWindowBoundaryType::
CURRENT_ROW) { //[preceding, current_row],[current_row, following]
TAnalyticWindowBoundaryType::CURRENT_ROW) {
// [preceding, current_row], [current_row, following] rewrite it's same
// as could reuse the previous calculate result, so don't call _reset_agg_status function
// going on calculate, add up data, no need to reset state
range_start = _shared_state->current_row_position;
range_end = _shared_state->current_row_position +
1; //going on calculate,add up data, no need to reset state
range_end = _shared_state->current_row_position + 1;
} else {
_reset_agg_status();
range_end = _shared_state->current_row_position + _rows_end_offset + 1;
if (!_parent->cast<AnalyticSourceOperatorX>()
._window.__isset
.window_start) { //[preceding, offset] --unbound: [preceding, following]
//[preceding, offset] --unbound: [preceding, following]
if (!_parent->cast<AnalyticSourceOperatorX>()._window.__isset.window_start) {
range_start = _partition_by_start.pos;
} else {
range_start = _shared_state->current_row_position + _rows_start_offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@ struct Value {
_offset = 0;
}

size_t offset_pos() { return _offset; }

protected:
const IColumn* _ptr = nullptr;
size_t _offset = 0;
Expand Down Expand Up @@ -103,11 +101,6 @@ struct CopiedValue : public Value<ColVecType, arg_is_nullable> {
}
}

size_t offset_pos() {
DCHECK(false) << " should call this in CopiedValue";
return 0;
}

private:
Field _copied_value;
};
Expand Down Expand Up @@ -149,8 +142,6 @@ struct ReaderFirstAndLastData {

bool has_set_value() { return _has_value; }

size_t offset_pos() { return _data_value.offset_pos(); }

bool is_null() { return _data_value.is_null(); }

protected:
Expand Down
2 changes: 2 additions & 0 deletions be/src/vec/aggregate_functions/aggregate_function_window.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ AggregateFunctionPtr create_function_lead_lag_first_last(const String& name,
WhichDataType which(*type);

bool arg_ignore_null_value = false;
// FE have rewrite case first_value(k1,false)--->first_value(k1)
// so size is 2, must will be arg_ignore_null_value
if (argument_types.size() == 2) {
DCHECK(name == "first_value" || name == "last_value") << "invalid function name: " << name;
arg_ignore_null_value = true;
Expand Down
35 changes: 19 additions & 16 deletions be/src/vec/aggregate_functions/aggregate_function_window.h
Original file line number Diff line number Diff line change
Expand Up @@ -474,14 +474,9 @@ struct WindowFunctionFirstImpl : Data {

if constexpr (arg_ignore_null) {
frame_end = std::min<int64_t>(frame_end, partition_end);
if (this->has_set_value()) {
frame_start = this->offset_pos();
}
const auto& second_arg = assert_cast<const ColumnVector<UInt8>&>(*columns[1]);
auto ignore_null_value = second_arg.get_data()[0];

if (ignore_null_value && columns[0]->is_nullable()) {
if (columns[0]->is_nullable()) {
const auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
// the valid range is: [frame_start, frame_end)
while (frame_start < frame_end - 1 && arg_nullable.is_null_at(frame_start)) {
frame_start++;
}
Expand All @@ -507,17 +502,25 @@ struct WindowFunctionLastImpl : Data {

if constexpr (arg_ignore_null) {
frame_start = std::max<int64_t>(frame_start, partition_start);
if (this->has_set_value()) {
frame_start = this->offset_pos();
}
const auto& second_arg = assert_cast<const ColumnVector<UInt8>&>(*columns[1]);
auto ignore_null_value = second_arg.get_data()[0];

if (ignore_null_value && columns[0]->is_nullable()) {
if (columns[0]->is_nullable()) {
const auto& arg_nullable = assert_cast<const ColumnNullable&>(*columns[0]);
while (frame_start < (frame_end - 1) && arg_nullable.is_null_at(frame_end - 1)) {
frame_end--;
// wants find a not null value in [frame_start, frame_end)
// iff has find: set_value and return directly
// iff not find: the while loop is finished
// case 1: iff has_set_value, means the previous window have value, could reuse it, so return directly
// case 2: iff not has_set_value, means there is none value, set it's to NULL
while (frame_start < frame_end) {
if (arg_nullable.is_null_at(frame_end - 1)) {
frame_end--;
} else {
this->set_value(columns, frame_end - 1);
return;
}
}
if (!this->has_set_value()) {
this->set_is_null();
}
return;
}
}

Expand Down

0 comments on commit d79699d

Please sign in to comment.