Skip to content

Commit

Permalink
refine framework for output
Browse files Browse the repository at this point in the history
  • Loading branch information
MrPresent-Han committed Nov 17, 2024
1 parent 0d5e670 commit 9b6792e
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 10 deletions.
55 changes: 55 additions & 0 deletions internal/core/src/common/FieldData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -321,4 +321,59 @@ InitScalarFieldData(const DataType& type, bool nullable, int64_t cap_rows) {
}
}

void ResizeScalarFieldData(const DataType& type, int64_t new_num_rows, FieldDataPtr& field_data){
switch(type) {
case DataType::BOOL: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<bool>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT8: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<int8_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT16: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<int16_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT32: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<int32_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::INT64: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<int64_t>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::FLOAT: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<float>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::DOUBLE: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<double>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::STRING:
case DataType::VARCHAR:{
auto inner_field_data = std::dynamic_pointer_cast<FieldData<std::string>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
case DataType::JSON: {
auto inner_field_data = std::dynamic_pointer_cast<FieldData<Json>>(field_data);
inner_field_data->resize_field_data(new_num_rows);
return;
}
default:
PanicInfo(DataTypeInvalid,
"ResizeScalarFieldData not support data type " +
GetDataTypeName(type));
}
}

} // namespace milvus
3 changes: 3 additions & 0 deletions internal/core/src/common/FieldData.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,7 @@ using ArrowReaderChannel = Channel<std::shared_ptr<milvus::ArrowDataWrapper>>;
FieldDataPtr
InitScalarFieldData(const DataType& type, bool nullable, int64_t cap_rows);

void
ResizeScalarFieldData(const DataType& type, int64_t new_size, FieldDataPtr& field_data);

} // namespace milvus
13 changes: 12 additions & 1 deletion internal/core/src/common/Vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ class ColumnVector final : public SimpleVector {
: SimpleVector(data_type, length, null_count),
is_bitmap_(false),
valid_values_(length,
!null_count.has_value() || null_count.value() == 0) {
!null_count.has_value() || null_count.value() == 0){
values_ = InitScalarFieldData(data_type, false, length);
}

Expand Down Expand Up @@ -188,6 +188,17 @@ class ColumnVector final : public SimpleVector {
return is_bitmap_;
}

void resize(vector_size_t new_size, bool setNotNull=true) override {
AssertInfo(!is_bitmap_, "Cannot resize bitmap column vector");
BaseVector::resize(new_size, setNotNull);
ResizeScalarFieldData(type(), new_size, values_);
valid_values_.resize(new_size);
}

void append(const ColumnVector& other) {
values_->FillFieldData(other.GetRawData(), other.size());
}

private:
bool is_bitmap_; // TODO: remove the field after implementing BitmapVector
FieldDataPtr values_;
Expand Down
72 changes: 63 additions & 9 deletions internal/core/src/query/ExecPlanNodeVisitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,42 @@ ExecPlanNodeVisitor::ExecuteTask(
return bitset_holder;
}

RowVectorPtr
ExecPlanNodeVisitor::ExecuteTask2(plan::PlanFragment &plan, std::shared_ptr<milvus::exec::QueryContext> query_context) {
LOG_DEBUG("plannode: {}, active_count: {}, timestamp: {}",
plan.plan_node_->ToString(),
query_context->get_active_count(),
query_context->get_query_timestamp());

auto task = milvus::exec::Task::Create(DEFAULT_TASK_ID, plan, 0, query_context);
RowVectorPtr ret = nullptr;
for (;;) {
auto result = task->Next();
if (!result) {
break;
}
if (ret) {
auto childrens = result->childrens();
AssertInfo(childrens.size() == ret->childrens().size(), "column count of row vectors in different rounds"
"should be consistent, ret_column_count:{}, "
"new_result_column_count:{}",
childrens.size(),
ret->childrens().size());
for(auto i = 0; i < childrens.size(); i++) {
if (auto column_vec = std::dynamic_pointer_cast<ColumnVector>(childrens[i])) {
auto ret_column_vector = std::dynamic_pointer_cast<ColumnVector>(ret->child(i));
ret_column_vector->append(*column_vec);
} else {
PanicInfo(UnexpectedError, "expr return type not matched");
}
}
} else {
ret = result;
}
}
return ret;
}

template <typename VectorType>
void
ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
Expand Down Expand Up @@ -133,7 +169,7 @@ ExecPlanNodeVisitor::VectorVisitorImpl(VectorPlanNode& node) {
query_context->set_placeholder_group(placeholder_group_);

// Do plan fragment task work
auto result = ExecuteTask(plan, query_context);
ExecuteTask(plan, query_context);

// Store result
search_result_opt_ = std::move(query_context->get_search_result());
Expand Down Expand Up @@ -182,18 +218,36 @@ ExecPlanNodeVisitor::visit(RetrievePlanNode& node) {
DEAFULT_QUERY_ID, segment, active_count, timestamp_);

// Do task execution
auto bitset_holder = ExecuteTask(plan, query_context);
auto result = ExecuteTask2(plan, query_context);
setupRetrieveResult(result,
query_context,
node,
retrieve_result,
segment);
}

// Store result
void ExecPlanNodeVisitor::setupRetrieveResult(const milvus::RowVectorPtr &result,
const std::shared_ptr<milvus::exec::QueryContext> query_context,
const RetrievePlanNode& node,
RetrieveResult& tmp_retrieve_result,
const segcore::SegmentInternalInterface* segment) {
if (node.is_count_) {
retrieve_result_opt_ = std::move(query_context->get_retrieve_result());
} else {
retrieve_result.total_data_cnt_ = bitset_holder.size();
tracer::AutoSpan _("Find Limit Pk", tracer::GetRootSpan());
auto results_pair = segment->find_first(node.limit_, bitset_holder);
retrieve_result.result_offsets_ = std::move(results_pair.first);
retrieve_result.has_more_result = results_pair.second;
retrieve_result_opt_ = std::move(retrieve_result);
AssertInfo(result->childrens().empty(), "Result row vector must have at least one column");
auto column_vec = std::dynamic_pointer_cast<ColumnVector>(result->child(0));
AssertInfo(column_vec, "children inside row vector must be of column vector for now");
if (column_vec->IsBitmap()){
BitsetTypeView view(column_vec->GetRawData(), column_vec->size());
tmp_retrieve_result.total_data_cnt_ = column_vec->size();
tracer::AutoSpan _("Find Limit Pk", tracer::GetRootSpan());
auto results_pair = segment->find_first(node.limit_, view);
tmp_retrieve_result.result_offsets_ = std::move(results_pair.first);
tmp_retrieve_result.has_more_result = results_pair.second;
retrieve_result_opt_ = std::move(tmp_retrieve_result);
} else {
// load data in the result vector into retrieve_result
}
}
}

Expand Down
11 changes: 11 additions & 0 deletions internal/core/src/query/ExecPlanNodeVisitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,17 @@ class ExecPlanNodeVisitor : public PlanNodeVisitor {
ExecuteTask(plan::PlanFragment& plan,
std::shared_ptr<milvus::exec::QueryContext> query_context);

static RowVectorPtr
ExecuteTask2(plan::PlanFragment& plan,
std::shared_ptr<milvus::exec::QueryContext> query_context);

void
setupRetrieveResult(const RowVectorPtr& result,
const std::shared_ptr<milvus::exec::QueryContext> query_context,
const RetrievePlanNode& node,
RetrieveResult& tmp_retrieve_result,
const segcore::SegmentInternalInterface* segment);

private:
template <typename VectorType>
void
Expand Down

0 comments on commit 9b6792e

Please sign in to comment.