Skip to content

Commit

Permalink
update with crchash
Browse files Browse the repository at this point in the history
  • Loading branch information
zhangstar333 committed Mar 11, 2024
1 parent 77f3bda commit fb354d4
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 25 deletions.
21 changes: 12 additions & 9 deletions be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,6 @@ Status ExchangeSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& inf
fmt::format("Crc32HashPartitioner({})", _partition_count));
} else if (_part_type == TPartitionType::TABLET_SINK_SHUFFLE_PARTITIONED) {
_partition_count = channels.size();
LOG(INFO) << "TABLET_SINK_SHUFFLE_PARTITIONED: " << _partition_count << " "
<< print_id(state->query_id());
_partitioner.reset(
new vectorized::Crc32HashPartitioner<LocalExchangeChannelIds>(channels.size()));
RETURN_IF_ERROR(_partitioner->init(p._texprs));
RETURN_IF_ERROR(_partitioner->prepare(state, p._row_desc));
_profile->add_info_string("Partitioner",
fmt::format("Crc32HashPartitioner({})", _partition_count));
_txn_id = p._tablet_sink_txn_id;
Expand Down Expand Up @@ -484,10 +478,19 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, vectorized::Block* block

const auto& row_ids = local_state._row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = local_state._row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
auto select_rows = row_ids.size();
std::vector<uint32_t> crc_hash_vals(select_rows);
auto* __restrict crc_hashes = crc_hash_vals.data();
auto column_int64 = vectorized::ColumnVector<int64>::create();

column_int64->insert_many_in_copy_way(reinterpret_cast<const char*>(tablet_ids.data()),
select_rows);
column_int64->update_crcs_with_value(crc_hashes, PrimitiveType::TYPE_BIGINT,
select_rows, 0, nullptr);
for (int idx = 0; idx < select_rows; ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id = tablet_ids[idx];
channel2rows[tablet_id % num_channels].emplace_back(row);
const auto& tablet_id_hash = crc_hash_vals[idx];
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
}
}

Expand Down
15 changes: 12 additions & 3 deletions be/src/vec/sink/vdata_stream_sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -703,10 +703,19 @@ Status VDataStreamSender::send(RuntimeState* state, Block* block, bool eos) {

const auto& row_ids = _row_part_tablet_ids[0].row_ids;
const auto& tablet_ids = _row_part_tablet_ids[0].tablet_ids;
for (int idx = 0; idx < row_ids.size(); ++idx) {
auto select_rows = row_ids.size();
std::vector<uint32_t> crc_hash_vals(select_rows);
auto* __restrict crc_hashes = crc_hash_vals.data();
auto column_int64 = vectorized::ColumnVector<int64>::create();

column_int64->insert_many_in_copy_way(reinterpret_cast<const char*>(tablet_ids.data()),
select_rows);
column_int64->update_crcs_with_value(crc_hashes, PrimitiveType::TYPE_BIGINT,
select_rows, 0, nullptr);
for (int idx = 0; idx < select_rows; ++idx) {
const auto& row = row_ids[idx];
const auto& tablet_id = tablet_ids[idx];
channel2rows[tablet_id % num_channels].emplace_back(row);
const auto& tablet_id_hash = crc_hash_vals[idx];
channel2rows[tablet_id_hash % num_channels].emplace_back(row);
}
}
if (eos) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -848,7 +848,7 @@ public class SessionVariable implements Serializable, Writable {
public boolean enableNereidsDmlWithPipeline = true;

@VariableMgr.VarAttr(name = ENABLE_STRICT_CONSISTENCY_DML, needForward = true)
public boolean enableStrictConsistencyDml = false;
public boolean enableStrictConsistencyDml = true;

@VariableMgr.VarAttr(name = ENABLE_VECTORIZED_ENGINE, varType = VariableAnnotation.EXPERIMENTAL_ONLINE)
public boolean enableVectorizedEngine = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,18 +26,6 @@ suite('nereids_insert_no_partition') {
sql 'set enable_nereids_dml=true'
sql 'set enable_strict_consistency_dml=true'

sql 'set experimental_enable_nereids_dml_with_pipeline=false'
explain {
// TODO: test turn off pipeline when dml, remove it if pipeline sink is ok
sql '''
insert into uni_light_sc_mow_not_null_nop_t with t as(
select * except(kaint, kmintint, kjson) from src where id is not null)
select * from t left semi join t t2 on t.id = t2.id;
'''

notContains("MultiCastDataSinks")
}

sql '''insert into agg_nop_t
select * except(kaint, kmintint, kjson) from src'''
sql 'sync'
Expand Down

0 comments on commit fb354d4

Please sign in to comment.