Skip to content

Commit

Permalink
3
Browse files Browse the repository at this point in the history
  • Loading branch information
xinyiZzz committed Sep 13, 2023
1 parent 27d56d2 commit 2dc22e3
Show file tree
Hide file tree
Showing 10 changed files with 19 additions and 47 deletions.
7 changes: 0 additions & 7 deletions be/src/runtime/buffer_control_block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,23 +208,18 @@ void BufferControlBlock::get_batch(GetResultBatchCtx* ctx) {

Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>* result) {
std::unique_lock<std::mutex> l(_lock);
LOG(INFO) << "11111111 d " << _buffer_rows << ", " << _packet_num;
if (!_status.ok()) {
LOG(INFO) << "11111111 h " << _buffer_rows << ", " << _packet_num;
return _status;
}
if (_is_cancelled) {
LOG(INFO) << "11111111 g " << _buffer_rows << ", " << _packet_num;
return Status::Cancelled("Cancelled");
}

while (_arrow_flight_batch_queue.empty() && !_is_cancelled && !_is_close) {
LOG(INFO) << "11111111 e " << _buffer_rows << ", " << _packet_num;
_data_arrival.wait_for(l, std::chrono::seconds(1));
}

if (_is_cancelled) {
LOG(INFO) << "11111111 f " << _buffer_rows << ", " << _packet_num;
return Status::Cancelled("Cancelled");
}

Expand All @@ -234,13 +229,11 @@ Status BufferControlBlock::get_arrow_batch(std::shared_ptr<arrow::RecordBatch>*
_buffer_rows -= (*result)->num_rows();
_data_removal.notify_one();
_packet_num++;
LOG(INFO) << "11111111 c " << _buffer_rows << ", " << _packet_num;
return Status::OK();
}

// normal path end
if (_is_close) {
LOG(INFO) << "11111111 i " << _buffer_rows << ", " << _packet_num;
return Status::OK();
}
return Status::InternalError("Abnormal Ending");
Expand Down
8 changes: 2 additions & 6 deletions be/src/runtime/result_buffer_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size

{
std::unique_lock<std::shared_mutex> wlock(_buffer_map_lock);
LOG(INFO) << "11111111 9 " << query_id << ", " << print_id(query_id);
_buffer_map.insert(std::make_pair(query_id, control_block));
// BufferControlBlock should destroy after max_timeout
// for exceed max_timeout FE will return timeout to client
Expand All @@ -101,7 +100,6 @@ Status ResultBufferMgr::create_sender(const TUniqueId& query_id, int buffer_size

std::shared_ptr<BufferControlBlock> ResultBufferMgr::find_control_block(const TUniqueId& query_id) {
std::shared_lock<std::shared_mutex> rlock(_buffer_map_lock);
LOG(INFO) << "11111111 8 " << query_id;
BufferMap::iterator iter = _buffer_map.find(query_id);

if (_buffer_map.end() != iter) {
Expand Down Expand Up @@ -144,9 +142,7 @@ void ResultBufferMgr::fetch_data(const PUniqueId& finst_id, GetResultBatchCtx* c
Status ResultBufferMgr::fetch_arrow_data(const TUniqueId& finst_id,
std::shared_ptr<arrow::RecordBatch>* result) {
std::shared_ptr<BufferControlBlock> cb = find_control_block(finst_id);
LOG(INFO) << "11111111 7 " << finst_id;
if (cb == nullptr) {
LOG(INFO) << "11111111 j " << finst_id;
LOG(WARNING) << "no result for this query, id=" << print_id(finst_id);
return Status::InternalError("no result for this query");
}
Expand All @@ -173,13 +169,14 @@ Status ResultBufferMgr::cancel(const TUniqueId& query_id) {
_row_descriptor_map.erase(row_desc_iter);
}
}

return Status::OK();
}

Status ResultBufferMgr::cancel_at_time(time_t cancel_time, const TUniqueId& query_id) {
std::lock_guard<std::mutex> l(_timeout_lock);
TimeoutMap::iterator iter = _timeout_map.find(cancel_time);
LOG(INFO) << "11111111 3 " << query_id << ", " << cancel_time;

if (_timeout_map.end() == iter) {
_timeout_map.insert(
std::pair<time_t, std::vector<TUniqueId>>(cancel_time, std::vector<TUniqueId>()));
Expand Down Expand Up @@ -212,7 +209,6 @@ void ResultBufferMgr::cancel_thread() {

// cancel query
for (int i = 0; i < query_to_cancel.size(); ++i) {
LOG(INFO) << "11111111 4 " << query_to_cancel[i];
cancel(query_to_cancel[i]);
}
} while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1)));
Expand Down
13 changes: 4 additions & 9 deletions be/src/service/arrow_flight/arrow_flight_batch_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::C
ARROW_RETURN_NOT_OK(arrow::Status::Invalid(fmt::format(
"Schema RowDescriptor Not Found, queryid: {}", print_id(statement_->query_id))));
}
LOG(INFO) << "11111111 2 " << statement_->query_id;
std::shared_ptr<arrow::Schema> schema;
auto st = convert_to_arrow_schema(row_desc, &schema);
if (UNLIKELY(!st.ok())) {
Expand All @@ -58,20 +57,16 @@ arrow::Result<std::shared_ptr<ArrowFlightBatchReader>> ArrowFlightBatchReader::C
}

arrow::Status ArrowFlightBatchReader::ReadNext(std::shared_ptr<arrow::RecordBatch>* out) {
// CHECK(*out == nullptr); // not nullptr
LOG(INFO) << "11111111 1 " << statement_->query_id;
// *out not nullptr
*out = nullptr;
auto st = ExecEnv::GetInstance()->result_mgr()->fetch_arrow_data(statement_->query_id, out);
if (UNLIKELY(!st.ok())) {
LOG(INFO) << "11111111 k " << statement_->query_id;
LOG(WARNING) << st.to_string();
LOG(WARNING) << "ArrowFlightBatchReader fetch arrow data failed: " + st.to_string();
ARROW_RETURN_NOT_OK(to_arrow_status(st));
}
if (*out != nullptr) {
std::stringstream ss;
arrow_pretty_print(*(*out), &ss);
LOG(INFO) << "11111111 m " << (*out)->num_rows() << ", " << (*out)->num_columns();
LOG(INFO) << "11111111 l " << ss.str();
VLOG_NOTICE << "ArrowFlightBatchReader read next: " << (*out)->num_rows() << ", "
<< (*out)->num_columns();
}
return arrow::Status::OK();
}
Expand Down
2 changes: 0 additions & 2 deletions be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -709,7 +709,6 @@ void PInternalServiceImpl::fetch_arrow_flight_schema(google::protobuf::RpcContro
const PFetchArrowFlightSchemaRequest* request,
PFetchArrowFlightSchemaResult* result,
google::protobuf::Closure* done) {
LOG(INFO) << "fetch_arrow_flight_schema";
bool ret = _light_work_pool.try_offer([request, result, done]() {
brpc::ClosureGuard closure_guard(done);
RowDescriptor row_desc = ExecEnv::GetInstance()->result_mgr()->find_row_descriptor(
Expand Down Expand Up @@ -931,7 +930,6 @@ void PInternalServiceImpl::update_cache(google::protobuf::RpcController* control
void PInternalServiceImpl::fetch_cache(google::protobuf::RpcController* controller,
const PFetchCacheRequest* request, PFetchCacheResult* result,
google::protobuf::Closure* done) {
LOG(INFO) << "fetch_cache";
bool ret = _heavy_work_pool.try_offer([this, request, result, done]() {
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->fetch(request, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -398,8 +398,8 @@ public class Config extends ConfigBase {
@ConfField(description = {"FE MySQL server 的端口号", "The port of FE MySQL server"})
public static int query_port = 9030;

@ConfField(description = {"FE Flight-SQL server 的端口号", "The port of FE Flight-SQL server"})
public static int flight_sql_query_port = 9040;
@ConfField(description = {"FE Arrow-Flight-SQL server 的端口号", "The port of FE Arrow-Flight-SQ server"})
public static int flight_sql_query_port = -1;

@ConfField(description = {"MySQL 服务的 IO 线程数", "The number of IO threads in MySQL service"})
public static int mysql_service_io_threads_num = 4;
Expand Down
12 changes: 0 additions & 12 deletions fe/fe-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ under the License.
<awssdk.version>2.17.257</awssdk.version>
<huaweiobs.version>3.1.1-hw-46</huaweiobs.version>
<tencentcos.version>3.3.5</tencentcos.version>
<commons-dbcp2.version>2.9.0</commons-dbcp2.version>
</properties>
<profiles>
<profile>
Expand Down Expand Up @@ -761,17 +760,6 @@ under the License.
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-vector</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-dbcp2</artifactId>
<version>${commons-dbcp2.version}</version>
<exclusions>
<exclusion>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,8 @@ public PlanFragment visitPhysicalDistribute(PhysicalDistribute<? extends Plan> d
public PlanFragment visitPhysicalResultSink(PhysicalResultSink<? extends Plan> physicalResultSink,
PlanTranslatorContext context) {
PlanFragment planFragment = physicalResultSink.child().accept(this, context);
planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId()));
planFragment.setSink(new ResultSink(planFragment.getPlanRoot().getId(),
ConnectContext.get().getResultSinkType()));
return planFragment;
}

Expand Down
9 changes: 5 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/qe/QeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,11 @@ public void start() throws Exception {
LOG.error("mysql server start failed");
System.exit(-1);
}
this.flightSqlService = new FlightSqlService(flightsqlPort);
if (!flightSqlService.start()) {
LOG.error("flightsql server start failed");
System.exit(-1);
if (flightsqlPort != -1) {
this.flightSqlService = new FlightSqlService(flightsqlPort);
if (!flightSqlService.start()) {
System.exit(-1);
}
}
LOG.info("QE service start.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,6 @@
import java.io.IOException;
import java.io.StringReader;
import java.nio.ByteBuffer;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -2571,7 +2570,8 @@ public List<ResultRow> executeInternalQuery() {
planner = new NereidsPlanner(statementContext);
planner.plan(parsedStmt, context.getSessionVariable().toThrift());
} catch (Exception e) {
LOG.warn("fall back to legacy planner, because: {}", e.getMessage(), e);
LOG.warn("Arrow Flight SQL fall back to legacy planner, because: {}",
e.getMessage(), e);
parsedStmt = null;
planner = null;
context.getState().setNereids(false);
Expand Down Expand Up @@ -2673,7 +2673,7 @@ public void executeArrowFlightQuery(FlightStatementContext flightStatementContex
coord.exec();
} catch (Exception e) {
queryScheduleSpan.recordException(e);
LOG.warn("Failed to coord exec, because: {}", e.getMessage(), e);
LOG.warn("Failed to coord exec Arrow Flight SQL, because: {}", e.getMessage(), e);
throw new InternalQueryExecutionException(e.getMessage() + Util.getRootCauseMessage(e), e);
} finally {
queryScheduleSpan.end();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.doris.common.Status;
import org.apache.doris.common.util.DebugUtil;
import org.apache.doris.common.util.Util;
import org.apache.doris.proto.InternalService;
import org.apache.doris.proto.Types;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.ConnectContext;
Expand Down Expand Up @@ -132,7 +133,6 @@ public static AutoCloseConnectContext buildConnectContext() {
ConnectContext connectContext = new ConnectContext();
SessionVariable sessionVariable = connectContext.getSessionVariable();
sessionVariable.internalSession = true;
sessionVariable.setEnableNereidsPlanner(false); // TODO
sessionVariable.setEnablePipelineEngine(false); // TODO
sessionVariable.setEnablePipelineXEngine(false); // TODO
connectContext.setEnv(Env.getCurrentEnv());
Expand Down

0 comments on commit 2dc22e3

Please sign in to comment.