Skip to content

Commit

Permalink
Merge branch 'master' into master-catalog-auth
Browse files Browse the repository at this point in the history
  • Loading branch information
CalvinKirs authored Dec 27, 2024
2 parents f0b82df + ec29e7d commit 9992c7f
Show file tree
Hide file tree
Showing 123 changed files with 8,043 additions and 411 deletions.
53 changes: 39 additions & 14 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@
#include <new>
#include <queue>
#include <shared_mutex>
#include <type_traits>

#include "common/compiler_util.h" // IWYU pragma: keep
#include "common/config.h"
#include "common/logging.h"
#include "common/status.h"
#include "io/hdfs_builder.h"
#include "olap/delete_handler.h"
#include "olap/olap_define.h"
#include "olap/rowset/pending_rowset_helper.h"
#include "olap/rowset/rowset_meta.h"
#include "olap/rowset/rowset_writer.h"
#include "olap/rowset/rowset_writer_context.h"
#include "olap/schema.h"
Expand All @@ -53,10 +54,11 @@
#include "olap/txn_manager.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/data_types/data_type_factory.hpp"
#include "vec/data_types/data_type_nullable.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/functions/simple_function_factory.h"
Expand Down Expand Up @@ -352,8 +354,12 @@ PushBrokerReader::PushBrokerReader(const Schema* schema, const TBrokerScanRange&
_file_params.expr_of_dest_slot = _params.expr_of_dest_slot;
_file_params.dest_sid_to_src_sid_without_trans = _params.dest_sid_to_src_sid_without_trans;
_file_params.strict_mode = _params.strict_mode;
_file_params.__isset.broker_addresses = true;
_file_params.broker_addresses = t_scan_range.broker_addresses;
if (_ranges[0].file_type == TFileType::FILE_HDFS) {
_file_params.hdfs_params = parse_properties(_params.properties);
} else {
_file_params.__isset.broker_addresses = true;
_file_params.broker_addresses = t_scan_range.broker_addresses;
}

for (const auto& range : _ranges) {
TFileRangeDesc file_range;
Expand Down Expand Up @@ -482,17 +488,36 @@ Status PushBrokerReader::_cast_to_input_block() {
auto& arg = _src_block_ptr->get_by_name(slot_desc->col_name());
// remove nullable here, let the get_function decide whether nullable
auto return_type = slot_desc->get_data_type_ptr();
vectorized::ColumnsWithTypeAndName arguments {
arg,
{vectorized::DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<vectorized::DataTypeString>(), ""}};
auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type);
idx = _src_block_name_to_idx[slot_desc->col_name()];
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
// bitmap convert:src -> to_base64 -> bitmap_from_base64
if (slot_desc->type().is_bitmap_type()) {
auto base64_return_type = vectorized::DataTypeFactory::instance().create_data_type(
vectorized::DataTypeString().get_type_as_type_descriptor(),
slot_desc->is_nullable());
auto func_to_base64 = vectorized::SimpleFunctionFactory::instance().get_function(
"to_base64", {arg}, base64_return_type);
RETURN_IF_ERROR(func_to_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(base64_return_type);
auto& arg_base64 = _src_block_ptr->get_by_name(slot_desc->col_name());
auto func_bitmap_from_base64 =
vectorized::SimpleFunctionFactory::instance().get_function(
"bitmap_from_base64", {arg_base64}, return_type);
RETURN_IF_ERROR(func_bitmap_from_base64->execute(nullptr, *_src_block_ptr, {idx}, idx,
arg_base64.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
} else {
vectorized::ColumnsWithTypeAndName arguments {
arg,
{vectorized::DataTypeString().create_column_const(
arg.column->size(), remove_nullable(return_type)->get_family_name()),
std::make_shared<vectorized::DataTypeString>(), ""}};
auto func_cast = vectorized::SimpleFunctionFactory::instance().get_function(
"CAST", arguments, return_type);
RETURN_IF_ERROR(
func_cast->execute(nullptr, *_src_block_ptr, {idx}, idx, arg.column->size()));
_src_block_ptr->get_by_position(idx).type = std::move(return_type);
}
}
return Status::OK();
}
Expand Down
4 changes: 2 additions & 2 deletions be/src/olap/task/engine_clone_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -813,15 +813,15 @@ Status EngineCloneTask::_finish_clone(Tablet* tablet, const std::string& clone_d
/// Traverse all downloaded clone files in CLONE dir.
/// If it does not exist in local tablet dir, link the file to local tablet dir
/// And save all linked files in linked_success_files.
/// if binlog exist in clone dir and md5sum equal, then skip link file
bool skip_link_file = false;
for (const string& clone_file : clone_file_names) {
if (local_file_names.find(clone_file) != local_file_names.end()) {
VLOG_NOTICE << "find same file when clone, skip it. "
<< "tablet=" << tablet->tablet_id() << ", clone_file=" << clone_file;
continue;
}

/// if binlog exist in clone dir and md5sum equal, then skip link file
bool skip_link_file = false;
std::string to;
if (clone_file.ends_with(".binlog") || clone_file.ends_with(".binlog-index")) {
if (!contain_binlog) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,10 @@ public class Config extends ConfigBase {
@ConfField(description = {"Yarn 配置文件的路径", "Yarn config path"})
public static String yarn_config_dir = System.getenv("DORIS_HOME") + "/lib/yarn-config";

@ConfField(mutable = true, masterOnly = true, description = {"Ingestion load 的默认超时时间,单位是秒。",
"Default timeout for ingestion load job, in seconds."})
public static int ingestion_load_default_timeout_second = 86400; // 1 day

@ConfField(mutable = true, masterOnly = true, description = {"Sync job 的最大提交间隔,单位是秒。",
"Maximal intervals between two sync job's commits."})
public static long sync_commit_interval_second = 10;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -371,14 +371,17 @@ public static class EtlIndex implements Serializable {
public String indexType;
@SerializedName(value = "isBaseIndex")
public boolean isBaseIndex;
@SerializedName(value = "schemaVersion")
public int schemaVersion;

public EtlIndex(long indexId, List<EtlColumn> etlColumns, int schemaHash,
String indexType, boolean isBaseIndex) {
String indexType, boolean isBaseIndex, int schemaVersion) {
this.indexId = indexId;
this.columns = etlColumns;
this.schemaHash = schemaHash;
this.indexType = indexType;
this.isBaseIndex = isBaseIndex;
this.schemaVersion = schemaVersion;
}

public EtlColumn getColumn(String name) {
Expand All @@ -398,6 +401,7 @@ public String toString() {
+ ", schemaHash=" + schemaHash
+ ", indexType='" + indexType + '\''
+ ", isBaseIndex=" + isBaseIndex
+ ", schemaVersion=" + schemaVersion
+ '}';
}
}
Expand Down
42 changes: 21 additions & 21 deletions fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -203,15 +203,24 @@ supportedCreateStatement

supportedAlterStatement
: ALTER VIEW name=multipartIdentifier (LEFT_PAREN cols=simpleColumnDefs RIGHT_PAREN)?
AS query #alterView
| ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault
| ALTER ROLE role=identifier commentSpec #alterRole
AS query #alterView
| ALTER STORAGE VAULT name=multipartIdentifier properties=propertyClause #alterStorageVault
| ALTER ROLE role=identifier commentSpec #alterRole
| ALTER WORKLOAD GROUP name=identifierOrText
properties=propertyClause? #alterWorkloadGroup
properties=propertyClause? #alterWorkloadGroup
| ALTER WORKLOAD POLICY name=identifierOrText
properties=propertyClause? #alterWorkloadPolicy
| ALTER SQL_BLOCK_RULE name=identifier properties=propertyClause? #alterSqlBlockRule
| ALTER CATALOG name=identifier MODIFY COMMENT comment=STRING_LITERAL #alterCatalogComment
properties=propertyClause? #alterWorkloadPolicy
| ALTER SQL_BLOCK_RULE name=identifier properties=propertyClause? #alterSqlBlockRule
| ALTER CATALOG name=identifier MODIFY COMMENT comment=STRING_LITERAL #alterCatalogComment
| ALTER ROLE role=identifier commentSpec #alterRole
| ALTER TABLE tableName=multipartIdentifier
alterTableClause (COMMA alterTableClause)* #alterTable
| ALTER TABLE tableName=multipartIdentifier ADD ROLLUP
addRollupClause (COMMA addRollupClause)* #alterTableAddRollup
| ALTER TABLE tableName=multipartIdentifier DROP ROLLUP
dropRollupClause (COMMA dropRollupClause)* #alterTableDropRollup
| ALTER TABLE name=multipartIdentifier
SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterTableProperties
;

supportedDropStatement
Expand Down Expand Up @@ -578,13 +587,7 @@ privilegeList
;

unsupportedAlterStatement
: ALTER TABLE tableName=multipartIdentifier
alterTableClause (COMMA alterTableClause)* #alterTable
| ALTER TABLE tableName=multipartIdentifier ADD ROLLUP
addRollupClause (COMMA addRollupClause)* #alterTableAddRollup
| ALTER TABLE tableName=multipartIdentifier DROP ROLLUP
dropRollupClause (COMMA dropRollupClause)* #alterTableDropRollup
| ALTER SYSTEM alterSystemClause #alterSystem
: ALTER SYSTEM alterSystemClause #alterSystem
| ALTER DATABASE name=identifier SET (DATA |REPLICA | TRANSACTION)
QUOTA INTEGER_VALUE identifier? #alterDatabaseSetQuota
| ALTER DATABASE name=identifier RENAME newName=identifier #alterDatabaseRename
Expand All @@ -598,8 +601,6 @@ unsupportedAlterStatement
SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterColocateGroup
| ALTER ROUTINE LOAD FOR name=multipartIdentifier properties=propertyClause?
(FROM type=identifier LEFT_PAREN propertyItemList RIGHT_PAREN)? #alterRoutineLoad
| ALTER TABLE name=multipartIdentifier
SET LEFT_PAREN propertyItemList RIGHT_PAREN #alterTableProperties
| ALTER STORAGE POLICY name=identifierOrText
properties=propertyClause #alterStoragePlicy
| ALTER USER (IF EXISTS)? grantUserIdentify
Expand Down Expand Up @@ -643,20 +644,19 @@ addRollupClause

alterTableClause
: ADD COLUMN columnDef columnPosition? toRollup? properties=propertyClause? #addColumnClause
| ADD COLUMN LEFT_PAREN columnDef (COMMA columnDef)* RIGHT_PAREN
| ADD COLUMN LEFT_PAREN columnDefs RIGHT_PAREN
toRollup? properties=propertyClause? #addColumnsClause
| DROP COLUMN name=identifier fromRollup? properties=propertyClause? #dropColumnClause
| MODIFY COLUMN columnDef columnPosition? fromRollup?
properties=propertyClause? #modifyColumnClause
| ORDER BY identifierList fromRollup? properties=propertyClause? #reorderColumnsClause
| ADD TEMPORARY? (lessThanPartitionDef | fixedPartitionDef | inPartitionDef)
(LEFT_PAREN partitionProperties=propertyItemList RIGHT_PAREN)?
| ADD TEMPORARY? partitionDef
(DISTRIBUTED BY (HASH hashKeys=identifierList | RANDOM)
(BUCKETS (INTEGER_VALUE | autoBucket=AUTO))?)?
properties=propertyClause? #addPartitionClause
| DROP TEMPORARY? PARTITION (IF EXISTS)? partitionName=identifier FORCE?
(FROM INDEX indexName=identifier)? #dropPartitionClause
| MODIFY TEMPORARY? PARTITION (IF EXISTS)?
| MODIFY TEMPORARY? PARTITION
(partitionName=identifier | partitionNames=identifierList
| LEFT_PAREN ASTERISK RIGHT_PAREN)
SET LEFT_PAREN partitionProperties=propertyItemList RIGHT_PAREN #modifyPartitionClause
Expand Down Expand Up @@ -1383,7 +1383,7 @@ indexDefs
;

indexDef
: INDEX (IF NOT EXISTS)? indexName=identifier cols=identifierList (USING indexType=(BITMAP | INVERTED | NGRAM_BF))? (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? (COMMENT comment=STRING_LITERAL)?
: INDEX (ifNotExists=IF NOT EXISTS)? indexName=identifier cols=identifierList (USING indexType=(BITMAP | INVERTED | NGRAM_BF))? (PROPERTIES LEFT_PAREN properties=propertyItemList RIGHT_PAREN)? (COMMENT comment=STRING_LITERAL)?
;

partitionsDef
Expand Down
Loading

0 comments on commit 9992c7f

Please sign in to comment.