From b53d0e404af30b9f2ed05f76aba129b4365d57f1 Mon Sep 17 00:00:00 2001 From: Guangdong Liu Date: Wed, 9 Oct 2024 09:47:31 +0800 Subject: [PATCH] [bugfix](arrow) Fix time zone issues and accuracy issues (#38215) Issue Number: close #38174 --- .../exec/memory_scratch_sink_operator.cpp | 2 +- be/src/pipeline/exec/result_sink_operator.cpp | 4 ++- be/src/util/arrow/row_batch.cpp | 32 +++++++++++-------- be/src/util/arrow/row_batch.h | 9 ++++-- be/src/vec/runtime/vparquet_transformer.cpp | 3 +- .../data/arrow_flight_sql_p0/test_select.out | 4 +++ regression-test/framework/pom.xml | 2 +- .../arrow_flight_sql_p0/test_select.groovy | 12 +++++++ 8 files changed, 47 insertions(+), 21 deletions(-) diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index b9f18c43e1e239b..262a5ef538e90bb 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -104,7 +104,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* { SCOPED_TIMER(local_state._get_arrow_schema_timer); // After expr executed, use recaculated schema as final schema - RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema)); + RETURN_IF_ERROR(get_arrow_schema(block, &block_arrow_schema, state->timezone())); } { SCOPED_TIMER(local_state._convert_block_to_arrow_batch_timer); diff --git a/be/src/pipeline/exec/result_sink_operator.cpp b/be/src/pipeline/exec/result_sink_operator.cpp index b8faa4f76f7a30b..15612168affd898 100644 --- a/be/src/pipeline/exec/result_sink_operator.cpp +++ b/be/src/pipeline/exec/result_sink_operator.cpp @@ -18,6 +18,7 @@ #include "result_sink_operator.h" #include +#include #include @@ -80,7 +81,8 @@ Status ResultSinkLocalState::open(RuntimeState* state) { } case TResultSinkType::ARROW_FLIGHT_PROTOCAL: { std::shared_ptr arrow_schema; - RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema)); + RETURN_IF_ERROR(convert_expr_ctxs_arrow_schema(_output_vexpr_ctxs, &arrow_schema, + state->timezone())); if (state->query_options().enable_parallel_result_sink) { state->exec_env()->result_mgr()->register_arrow_schema(state->query_id(), arrow_schema); } else { diff --git a/be/src/util/arrow/row_batch.cpp b/be/src/util/arrow/row_batch.cpp index 084765e5aaa9c02..308986df68e1d2d 100644 --- a/be/src/util/arrow/row_batch.cpp +++ b/be/src/util/arrow/row_batch.cpp @@ -46,7 +46,8 @@ namespace doris { using strings::Substitute; -Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result) { +Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, + const std::string& timezone) { switch (type.type) { case TYPE_NULL: *result = arrow::null(); @@ -96,11 +97,11 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr 3) { - *result = std::make_shared(arrow::TimeUnit::MICRO); + *result = std::make_shared(arrow::TimeUnit::MICRO, timezone); } else if (type.scale > 0) { - *result = std::make_shared(arrow::TimeUnit::MILLI); + *result = std::make_shared(arrow::TimeUnit::MILLI, timezone); } else { - *result = std::make_shared(arrow::TimeUnit::SECOND); + *result = std::make_shared(arrow::TimeUnit::SECOND, timezone); } break; case TYPE_DECIMALV2: @@ -120,7 +121,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr item_type; - static_cast(convert_to_arrow_type(type.children[0], &item_type)); + static_cast(convert_to_arrow_type(type.children[0], &item_type, timezone)); *result = std::make_shared(item_type); break; } @@ -128,8 +129,8 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr key_type; std::shared_ptr val_type; - static_cast(convert_to_arrow_type(type.children[0], &key_type)); - static_cast(convert_to_arrow_type(type.children[1], &val_type)); + static_cast(convert_to_arrow_type(type.children[0], &key_type, timezone)); + static_cast(convert_to_arrow_type(type.children[1], &val_type, timezone)); *result = std::make_shared(key_type, val_type); break; } @@ -138,7 +139,7 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr> fields; for (size_t i = 0; i < type.children.size(); i++) { std::shared_ptr field_type; - static_cast(convert_to_arrow_type(type.children[i], &field_type)); + static_cast(convert_to_arrow_type(type.children[i], &field_type, timezone)); fields.push_back(std::make_shared(type.field_names[i], field_type, type.contains_nulls[i])); } @@ -156,19 +157,21 @@ Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* field) { +Status convert_to_arrow_field(SlotDescriptor* desc, std::shared_ptr* field, + const std::string& timezone) { std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type)); + RETURN_IF_ERROR(convert_to_arrow_type(desc->type(), &type, timezone)); *field = arrow::field(desc->col_name(), type, desc->is_nullable()); return Status::OK(); } -Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result) { +Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (const auto& type_and_name : block) { std::shared_ptr arrow_type; RETURN_IF_ERROR(convert_to_arrow_type(type_and_name.type->get_type_as_type_descriptor(), - &arrow_type)); + &arrow_type, timezone)); fields.push_back(std::make_shared(type_and_name.name, arrow_type, type_and_name.type->is_nullable())); } @@ -177,12 +180,13 @@ Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result) { + std::shared_ptr* result, + const std::string& timezone) { std::vector> fields; for (int i = 0; i < output_vexpr_ctxs.size(); i++) { std::shared_ptr arrow_type; auto root_expr = output_vexpr_ctxs.at(i)->root(); - RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type)); + RETURN_IF_ERROR(convert_to_arrow_type(root_expr->type(), &arrow_type, timezone)); auto field_name = root_expr->is_slot_ref() && !root_expr->expr_label().empty() ? root_expr->expr_label() : fmt::format("{}_{}", root_expr->data_type()->get_name(), i); diff --git a/be/src/util/arrow/row_batch.h b/be/src/util/arrow/row_batch.h index 5dd76ff66d7ff8d..d036086902348e7 100644 --- a/be/src/util/arrow/row_batch.h +++ b/be/src/util/arrow/row_batch.h @@ -41,12 +41,15 @@ namespace doris { class RowDescriptor; -Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result); +Status convert_to_arrow_type(const TypeDescriptor& type, std::shared_ptr* result, + const std::string& timezone); -Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result); +Status get_arrow_schema(const vectorized::Block& block, std::shared_ptr* result, + const std::string& timezone); Status convert_expr_ctxs_arrow_schema(const vectorized::VExprContextSPtrs& output_vexpr_ctxs, - std::shared_ptr* result); + std::shared_ptr* result, + const std::string& timezone); Status serialize_record_batch(const arrow::RecordBatch& record_batch, std::string* result); diff --git a/be/src/vec/runtime/vparquet_transformer.cpp b/be/src/vec/runtime/vparquet_transformer.cpp index 1969858349f0e90..f0810d6c7ceeade 100644 --- a/be/src/vec/runtime/vparquet_transformer.cpp +++ b/be/src/vec/runtime/vparquet_transformer.cpp @@ -266,7 +266,8 @@ Status VParquetTransformer::_parse_schema() { std::vector> fields; for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { std::shared_ptr type; - RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type)); + RETURN_IF_ERROR(convert_to_arrow_type(_output_vexpr_ctxs[i]->root()->type(), &type, + _state->timezone())); if (_parquet_schemas != nullptr) { std::shared_ptr field = arrow::field(_parquet_schemas->operator[](i).schema_column_name, type, diff --git a/regression-test/data/arrow_flight_sql_p0/test_select.out b/regression-test/data/arrow_flight_sql_p0/test_select.out index d643597bbafcb5e..f2f4b86bbf5ceb2 100644 --- a/regression-test/data/arrow_flight_sql_p0/test_select.out +++ b/regression-test/data/arrow_flight_sql_p0/test_select.out @@ -2,3 +2,7 @@ -- !arrow_flight_sql -- 777 4 +-- !arrow_flight_sql_datetime -- +333 plsql333 2024-07-21 12:00:00.123456 2024-07-21 12:00:00.0 +222 plsql222 2024-07-20 12:00:00.123456 2024-07-20 12:00:00.0 +111 plsql111 2024-07-19 12:00:00.123456 2024-07-19 12:00:00.0 \ No newline at end of file diff --git a/regression-test/framework/pom.xml b/regression-test/framework/pom.xml index aded781c08bb712..6b749bf0fd1dae6 100644 --- a/regression-test/framework/pom.xml +++ b/regression-test/framework/pom.xml @@ -379,7 +379,7 @@ under the License. org.apache.doris flink-doris-connector-1.16 - 1.6.1 + 24.0.0 diff --git a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy index 55b3c301e244f4a..950fb4af7e90343 100644 --- a/regression-test/suites/arrow_flight_sql_p0/test_select.groovy +++ b/regression-test/suites/arrow_flight_sql_p0/test_select.groovy @@ -28,4 +28,16 @@ suite("test_select", "arrow_flight_sql") { sql """INSERT INTO ${tableName} VALUES(111, "plsql333")""" qt_arrow_flight_sql "select sum(id) as a, count(1) as b from ${tableName}" + + tableName = "test_select_datetime" + sql "DROP TABLE IF EXISTS ${tableName}" + sql """ + create table ${tableName} (id int, name varchar(20), f_datetime_p datetime(6), f_datetime datetime) DUPLICATE key(`id`) distributed by hash (`id`) buckets 4 + properties ("replication_num"="1"); + """ + sql """INSERT INTO ${tableName} VALUES(111, "plsql111","2024-07-19 12:00:00.123456","2024-07-19 12:00:00")""" + sql """INSERT INTO ${tableName} VALUES(222, "plsql222","2024-07-20 12:00:00.123456","2024-07-20 12:00:00")""" + sql """INSERT INTO ${tableName} VALUES(333, "plsql333","2024-07-21 12:00:00.123456","2024-07-21 12:00:00")""" + + qt_arrow_flight_sql_datetime "select * from ${tableName} order by id desc" }