Skip to content

Commit

Permalink
[Feature](partitions) Support auto-partition (apache#24153)
Browse files Browse the repository at this point in the history
Co-authored-by: zhangstar333 <[email protected]>
  • Loading branch information
zclllyybb and zhangstar333 authored Sep 12, 2023
1 parent 4bb9a12 commit d3f1388
Show file tree
Hide file tree
Showing 29 changed files with 1,630 additions and 565 deletions.
3 changes: 3 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ E(ALREADY_CANCELLED, -237);
E(TOO_MANY_SEGMENTS, -238);
E(ALREADY_CLOSED, -239);
E(SERVICE_UNAVAILABLE, -240);
E(NEED_SEND_AGAIN, -241);
E(CE_CMD_PARAMS_ERROR, -300);
E(CE_BUFFER_TOO_SMALL, -301);
E(CE_CMD_NOT_VALID, -302);
Expand Down Expand Up @@ -285,6 +286,7 @@ constexpr bool capture_stacktrace(int code) {
&& code != ErrorCode::TOO_MANY_VERSION
&& code != ErrorCode::ALREADY_CANCELLED
&& code != ErrorCode::ALREADY_CLOSED
&& code != ErrorCode::NEED_SEND_AGAIN
&& code != ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST
&& code != ErrorCode::BE_NO_SUITABLE_VERSION
&& code != ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION
Expand Down Expand Up @@ -428,6 +430,7 @@ class Status {
ERROR_CTOR(DataQualityError, DATA_QUALITY_ERROR)
ERROR_CTOR(NotAuthorized, NOT_AUTHORIZED)
ERROR_CTOR(HttpError, HTTP_ERROR)
ERROR_CTOR(NeedSendAgain, NEED_SEND_AGAIN)
#undef ERROR_CTOR

template <int code>
Expand Down
282 changes: 223 additions & 59 deletions be/src/exec/tablet_info.cpp

Large diffs are not rendered by default.

90 changes: 63 additions & 27 deletions be/src/exec/tablet_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
#include <map>
#include <memory>
#include <string>
#include <tuple>
#include <unordered_map>
#include <utility>
#include <vector>
Expand All @@ -35,6 +36,8 @@
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/exprs/vexpr_fwd.h"

namespace doris {
Expand Down Expand Up @@ -110,7 +113,8 @@ using OlapTableIndexTablets = TOlapTableIndexTablets;
// }

using BlockRow = std::pair<vectorized::Block*, int32_t>;
using VecBlock = vectorized::Block;
using BlockRowWithIndicator =
std::tuple<vectorized::Block*, int32_t, bool>; // [block, column, is_transformed]

struct VOlapTablePartition {
int64_t id = 0;
Expand All @@ -125,32 +129,20 @@ struct VOlapTablePartition {
: start_key {partition_block, -1}, end_key {partition_block, -1} {}
};

// this is only used by tablet_sink. so we can assume it's inited by its' descriptor.
class VOlapTablePartKeyComparator {
public:
VOlapTablePartKeyComparator(const std::vector<uint16_t>& slot_locs) : _slot_locs(slot_locs) {}
VOlapTablePartKeyComparator(const std::vector<uint16_t>& slot_locs,
const std::vector<uint16_t>& params_locs)
: _slot_locs(slot_locs), _param_locs(params_locs) {}

// return true if lhs < rhs
// 'row' is -1 mean
bool operator()(const BlockRow* lhs, const BlockRow* rhs) const {
if (lhs->second == -1) {
return false;
} else if (rhs->second == -1) {
return true;
}

for (auto slot_loc : _slot_locs) {
auto res = lhs->first->get_by_position(slot_loc).column->compare_at(
lhs->second, rhs->second, *rhs->first->get_by_position(slot_loc).column, -1);
if (res != 0) {
return res < 0;
}
}
// equal, return false
return false;
}
// 'row' is -1 mean maximal boundary
bool operator()(const BlockRowWithIndicator lhs, const BlockRowWithIndicator rhs) const;

private:
const std::vector<uint16_t>& _slot_locs;
const std::vector<uint16_t>& _param_locs;
};

// store an olap table's tablet information
Expand All @@ -174,6 +166,26 @@ class VOlapTablePartitionParam {

const std::vector<VOlapTablePartition*>& get_partitions() const { return _partitions; }

// it's same with auto now because we only support transformed partition in auto partition. may expand in future
bool is_projection_partition() const { return _is_auto_partiton; }
bool is_auto_partition() const { return _is_auto_partiton; }

std::vector<uint16_t> get_partition_keys() const { return _partition_slot_locs; }

Status add_partitions(const std::vector<TOlapTablePartition>& partitions);

//TODO: use vector when we support multi partition column for auto-partition
vectorized::VExprContextSPtr get_part_func_ctx() { return _part_func_ctx; }
vectorized::VExprSPtr get_partition_function() { return _partition_function; }

// which will affect _partition_block
Status generate_partition_from(const TOlapTablePartition& t_part,
VOlapTablePartition*& part_result);

void set_transformed_slots(const std::vector<uint16_t>& new_slots) {
_transformed_slot_locs = new_slots;
}

private:
Status _create_partition_keys(const std::vector<TExprNode>& t_exprs, BlockRow* part_key);

Expand All @@ -182,33 +194,40 @@ class VOlapTablePartitionParam {
std::function<uint32_t(BlockRow*, int64_t)> _compute_tablet_index;

// check if this partition contain this key
bool _part_contains(VOlapTablePartition* part, BlockRow* key) const {
// start_key.second == -1 means only single partition
VOlapTablePartKeyComparator comparator(_partition_slot_locs);
return part->start_key.second == -1 || !comparator(key, &part->start_key);
}
bool _part_contains(VOlapTablePartition* part, BlockRowWithIndicator key) const;

// this partition only valid in this schema
std::shared_ptr<OlapTableSchemaParam> _schema;
TOlapTablePartitionParam _t_param;

const std::vector<SlotDescriptor*>& _slots;
std::vector<uint16_t> _partition_slot_locs;
std::vector<uint16_t> _transformed_slot_locs;
std::vector<uint16_t> _distributed_slot_locs;

ObjectPool _obj_pool;
vectorized::Block _partition_block;
std::unique_ptr<MemTracker> _mem_tracker;
std::vector<VOlapTablePartition*> _partitions;
std::unique_ptr<std::map<BlockRow*, VOlapTablePartition*, VOlapTablePartKeyComparator>>
// For all partition value rows saved in this map, indicator is false. whenever we use a value to find in it, the param is true.
// so that we can distinguish which column index to use (origin slots or transformed slots).
std::unique_ptr<
std::map<BlockRowWithIndicator, VOlapTablePartition*, VOlapTablePartKeyComparator>>
_partitions_map;

bool _is_in_partition = false;
uint32_t _mem_usage = 0;
// only works when using list partition, the resource is owned by _partitions
VOlapTablePartition* _default_partition = nullptr;

// for auto partition, now only support 1 column. TODO: use vector to save them when we support multi column auto-partition.
bool _is_auto_partiton = false;
vectorized::VExprContextSPtr _part_func_ctx = nullptr;
vectorized::VExprSPtr _partition_function = nullptr;
TPartitionType::type _part_type; // support list or range
};

// indicate where's the tablet and all its replications (node-wise)
using TabletLocation = TTabletLocation;
// struct TTabletLocation {
// 1: required i64 tablet_id
Expand All @@ -235,9 +254,17 @@ class OlapTableLocationParam {
return nullptr;
}

void add_locations(std::vector<TTabletLocation>& locations) {
for (auto& location : locations) {
if (_tablets.find(location.tablet_id) == _tablets.end()) {
_tablets[location.tablet_id] = &location;
}
}
}

private:
TOlapTableLocationParam _t_param;

// [tablet_id, tablet]. tablet has id, also.
std::unordered_map<int64_t, TabletLocation*> _tablets;
};

Expand Down Expand Up @@ -278,6 +305,15 @@ class DorisNodesInfo {
return nullptr;
}

void add_nodes(const std::vector<TNodeInfo>& t_nodes) {
for (const auto& node : t_nodes) {
auto node_info = find_node(node.id);
if (node_info == nullptr) {
_nodes.emplace(node.id, node);
}
}
}

const std::unordered_map<int64_t, NodeInfo>& nodes_info() { return _nodes; }

private:
Expand Down
84 changes: 1 addition & 83 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,89 +193,7 @@ PFilterType get_type(RuntimeFilterType type) {
}

Status create_literal(const TypeDescriptor& type, const void* data, vectorized::VExprSPtr& expr) {
TExprNode node;

switch (type.type) {
case TYPE_BOOLEAN: {
create_texpr_literal_node<TYPE_BOOLEAN>(data, &node);
break;
}
case TYPE_TINYINT: {
create_texpr_literal_node<TYPE_TINYINT>(data, &node);
break;
}
case TYPE_SMALLINT: {
create_texpr_literal_node<TYPE_SMALLINT>(data, &node);
break;
}
case TYPE_INT: {
create_texpr_literal_node<TYPE_INT>(data, &node);
break;
}
case TYPE_BIGINT: {
create_texpr_literal_node<TYPE_BIGINT>(data, &node);
break;
}
case TYPE_LARGEINT: {
create_texpr_literal_node<TYPE_LARGEINT>(data, &node);
break;
}
case TYPE_FLOAT: {
create_texpr_literal_node<TYPE_FLOAT>(data, &node);
break;
}
case TYPE_DOUBLE: {
create_texpr_literal_node<TYPE_DOUBLE>(data, &node);
break;
}
case TYPE_DATEV2: {
create_texpr_literal_node<TYPE_DATEV2>(data, &node);
break;
}
case TYPE_DATETIMEV2: {
create_texpr_literal_node<TYPE_DATETIMEV2>(data, &node);
break;
}
case TYPE_DATE: {
create_texpr_literal_node<TYPE_DATE>(data, &node);
break;
}
case TYPE_DATETIME: {
create_texpr_literal_node<TYPE_DATETIME>(data, &node);
break;
}
case TYPE_DECIMALV2: {
create_texpr_literal_node<TYPE_DECIMALV2>(data, &node, type.precision, type.scale);
break;
}
case TYPE_DECIMAL32: {
create_texpr_literal_node<TYPE_DECIMAL32>(data, &node, type.precision, type.scale);
break;
}
case TYPE_DECIMAL64: {
create_texpr_literal_node<TYPE_DECIMAL64>(data, &node, type.precision, type.scale);
break;
}
case TYPE_DECIMAL128I: {
create_texpr_literal_node<TYPE_DECIMAL128I>(data, &node, type.precision, type.scale);
break;
}
case TYPE_CHAR: {
create_texpr_literal_node<TYPE_CHAR>(data, &node);
break;
}
case TYPE_VARCHAR: {
create_texpr_literal_node<TYPE_VARCHAR>(data, &node);
break;
}
case TYPE_STRING: {
create_texpr_literal_node<TYPE_STRING>(data, &node);
break;
}
default:
DCHECK(false);
return Status::InvalidArgument("Invalid type!");
}
TExprNode node = create_texpr_node_from(data, type.type, type.precision, type.scale);

try {
expr = vectorized::VLiteral::create_shared(node);
Expand Down
Loading

0 comments on commit d3f1388

Please sign in to comment.