Skip to content

Commit

Permalink
[fix](flink) Fix FetchResult and MemoryScratchSink stuck (apache#42216)
Browse files Browse the repository at this point in the history
Before each get queue, will set sink task dependency ready.
so if the sink task put queue faster than the fetch result get queue,
the queue size will always be 10.

Be sure to set sink dependency ready before getting queue.
otherwise, if queue is emptied after sink task put queue and before
block dependency, get queue will stuck and will never set sink
dependency ready.

Fix:
```
WARN  org.apache.doris.flink.backend.BackendClient                 [] - Get next from Doris BE{host='', port=9060} failed.
org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out
	at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:179) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.shaded.org.apache.thrift.transport.TTransport.readAll(TTransport.java:109) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.shaded.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.sdk.thrift.TDorisExternalService$Client.recvGetNext(TDorisExternalService.java:92) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.sdk.thrift.TDorisExternalService$Client.getNext(TDorisExternalService.java:79) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.flink.backend.BackendClient.getNext(BackendClient.java:185) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:243) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:71) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:34) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:140) ~[flink-connector-files-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) [flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1]
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191]
Caused by: java.net.SocketTimeoutException: Read timed out
	at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_191]
	at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_191]
	at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_191]
	at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_191]
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_191]
	at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_191]
	at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_191]
	at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4]
	... 24 more
```
  • Loading branch information
xinyiZzz committed Oct 28, 2024
1 parent 86ce3f8 commit f0f4284
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 5 deletions.
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/memory_scratch_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block*
RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(),
&result, _timezone_obj));
local_state._queue->blocking_put(result);
if (local_state._queue->size() < 10) {
if (local_state._queue->size() > config::max_memory_sink_batch_count) {
local_state._queue_dependency->block();
}
return Status::OK();
Expand Down
10 changes: 8 additions & 2 deletions be/src/runtime/record_batch_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,16 @@
namespace doris {

bool RecordBatchQueue::blocking_get(std::shared_ptr<arrow::RecordBatch>* result) {
auto res = _queue.blocking_get(result);
if (_dep && size() <= 10) {
if (_dep && size() <= config::max_memory_sink_batch_count) {
_dep->set_ready();
}
// Before each get queue, will set sink task dependency ready.
// so if the sink task put queue faster than the fetch result get queue,
// the queue size will always be 10.
// be sure to set sink dependency ready before getting queue.
// otherwise, if queue is emptied after sink task put queue and before block dependency,
// get queue will stuck and will never set sink dependency ready.
auto res = _queue.blocking_get(result);
return res;
}

Expand Down
6 changes: 4 additions & 2 deletions be/src/runtime/result_queue_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id,
if (iter != _fragment_queue_map.end()) {
*queue = iter->second;
} else {
// the blocking queue size = 20 (default), in this way, one queue have 20 * 1024 rows at most
BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count));
// max_elements will not take effect, because when queue size reaches max_memory_sink_batch_count,
// MemoryScratchSink will block queue dependency, in this way, one queue have 20 * 1024 rows at most.
// use MemoryScratchSink queue dependency instead of BlockingQueue to achieve blocking.
BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count * 2));
_fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp));
*queue = tmp;
}
Expand Down

0 comments on commit f0f4284

Please sign in to comment.