From f0f4284033e62a497b1ab9487876794225c1ef34 Mon Sep 17 00:00:00 2001 From: Xinyi Zou Date: Tue, 22 Oct 2024 19:36:34 +0800 Subject: [PATCH] [fix](flink) Fix FetchResult and MemoryScratchSink stuck (#42216) Before each get queue, will set sink task dependency ready. so if the sink task put queue faster than the fetch result get queue, the queue size will always be 10. Be sure to set sink dependency ready before getting queue. otherwise, if queue is emptied after sink task put queue and before block dependency, get queue will stuck and will never set sink dependency ready. Fix: ``` WARN org.apache.doris.flink.backend.BackendClient [] - Get next from Doris BE{host='', port=9060} failed. org.apache.doris.shaded.org.apache.thrift.transport.TTransportException: java.net.SocketTimeoutException: Read timed out at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:179) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.shaded.org.apache.thrift.transport.TTransport.readAll(TTransport.java:109) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readAll(TBinaryProtocol.java:464) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readI32(TBinaryProtocol.java:362) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.shaded.org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:245) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.shaded.org.apache.thrift.TServiceClient.receiveBase(TServiceClient.java:77) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.sdk.thrift.TDorisExternalService$Client.recvGetNext(TDorisExternalService.java:92) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.sdk.thrift.TDorisExternalService$Client.getNext(TDorisExternalService.java:79) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.flink.backend.BackendClient.getNext(BackendClient.java:185) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.flink.source.reader.DorisValueReader.hasNext(DorisValueReader.java:243) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:71) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.doris.flink.source.split.DorisSplitRecords.nextRecordFromSplit(DorisSplitRecords.java:34) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:140) ~[flink-connector-files-1.17.1.jar:1.17.1] at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:417) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:550) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:839) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:788) ~[flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:952) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:931) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:745) [flink-dist-1.17.1.jar:1.17.1] at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) [flink-dist-1.17.1.jar:1.17.1] at java.lang.Thread.run(Thread.java:748) [?:1.8.0_191] Caused by: java.net.SocketTimeoutException: Read timed out at java.net.SocketInputStream.socketRead0(Native Method) ~[?:1.8.0_191] at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) ~[?:1.8.0_191] at java.net.SocketInputStream.read(SocketInputStream.java:171) ~[?:1.8.0_191] at java.net.SocketInputStream.read(SocketInputStream.java:141) ~[?:1.8.0_191] at java.io.BufferedInputStream.fill(BufferedInputStream.java:246) ~[?:1.8.0_191] at java.io.BufferedInputStream.read1(BufferedInputStream.java:286) ~[?:1.8.0_191] at java.io.BufferedInputStream.read(BufferedInputStream.java:345) ~[?:1.8.0_191] at org.apache.doris.shaded.org.apache.thrift.transport.TIOStreamTransport.read(TIOStreamTransport.java:177) ~[blob_p-44cf081797997465dd46a38b036d2b88e1b6e4d4-bb500662c4f1b3245c2c995a4e691a8a:2.1.4] ... 24 more ``` --- be/src/pipeline/exec/memory_scratch_sink_operator.cpp | 2 +- be/src/runtime/record_batch_queue.cpp | 10 ++++++++-- be/src/runtime/result_queue_mgr.cpp | 6 ++++-- 3 files changed, 13 insertions(+), 5 deletions(-) diff --git a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp index 69e30791c139af..131f3caf42c6db 100644 --- a/be/src/pipeline/exec/memory_scratch_sink_operator.cpp +++ b/be/src/pipeline/exec/memory_scratch_sink_operator.cpp @@ -100,7 +100,7 @@ Status MemoryScratchSinkOperatorX::sink(RuntimeState* state, vectorized::Block* RETURN_IF_ERROR(convert_to_arrow_batch(block, block_arrow_schema, arrow::default_memory_pool(), &result, _timezone_obj)); local_state._queue->blocking_put(result); - if (local_state._queue->size() < 10) { + if (local_state._queue->size() > config::max_memory_sink_batch_count) { local_state._queue_dependency->block(); } return Status::OK(); diff --git a/be/src/runtime/record_batch_queue.cpp b/be/src/runtime/record_batch_queue.cpp index 83982688880948..25db550db3a7f1 100644 --- a/be/src/runtime/record_batch_queue.cpp +++ b/be/src/runtime/record_batch_queue.cpp @@ -23,10 +23,16 @@ namespace doris { bool RecordBatchQueue::blocking_get(std::shared_ptr* result) { - auto res = _queue.blocking_get(result); - if (_dep && size() <= 10) { + if (_dep && size() <= config::max_memory_sink_batch_count) { _dep->set_ready(); } + // Before each get queue, will set sink task dependency ready. + // so if the sink task put queue faster than the fetch result get queue, + // the queue size will always be 10. + // be sure to set sink dependency ready before getting queue. + // otherwise, if queue is emptied after sink task put queue and before block dependency, + // get queue will stuck and will never set sink dependency ready. + auto res = _queue.blocking_get(result); return res; } diff --git a/be/src/runtime/result_queue_mgr.cpp b/be/src/runtime/result_queue_mgr.cpp index 8090a3e6ee4787..8a6e5b1093542d 100644 --- a/be/src/runtime/result_queue_mgr.cpp +++ b/be/src/runtime/result_queue_mgr.cpp @@ -82,8 +82,10 @@ void ResultQueueMgr::create_queue(const TUniqueId& fragment_instance_id, if (iter != _fragment_queue_map.end()) { *queue = iter->second; } else { - // the blocking queue size = 20 (default), in this way, one queue have 20 * 1024 rows at most - BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count)); + // max_elements will not take effect, because when queue size reaches max_memory_sink_batch_count, + // MemoryScratchSink will block queue dependency, in this way, one queue have 20 * 1024 rows at most. + // use MemoryScratchSink queue dependency instead of BlockingQueue to achieve blocking. + BlockQueueSharedPtr tmp(new RecordBatchQueue(config::max_memory_sink_batch_count * 2)); _fragment_queue_map.insert(std::make_pair(fragment_instance_id, tmp)); *queue = tmp; }