Skip to content

Commit

Permalink
Wait until buffer flush (#1385)
Browse files Browse the repository at this point in the history
* Wait until all data in buffer is flushed to client when upstream server finishes.

(cherry picked from commit d776506)

* Wait until buffer flush

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

* Avoid shadowing

* _teared not _teardown

* Refactor logic

* Do not try `read_from_descriptors` if reads have previously teared down

* [pre-commit.ci] auto fixes from pre-commit.com hooks

for more information, see https://pre-commit.ci

---------

Co-authored-by: yk <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Apr 13, 2024
1 parent 2fa320d commit 5ab40f2
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 25 deletions.
21 changes: 13 additions & 8 deletions helper/monitor_open_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ if [[ -z "$PROXY_PY_PID" ]]; then
exit 1
fi

OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l)
echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN"
while true;
do
OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l)
echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN"

pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l)
echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR"
pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l)
echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR"

pgrep -P "$acceptorPid" | while read -r childPid; do
OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l)
echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC"
pgrep -P "$acceptorPid" | while read -r childPid; do
OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l)
echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC"
done
done

sleep 1
done
6 changes: 5 additions & 1 deletion proxy/core/connection/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,11 @@ def flush(self, max_send_size: Optional[int] = None) -> int:
# TODO: Assemble multiple packets if total
# size remains below max send size.
max_send_size = max_send_size or DEFAULT_MAX_SEND_SIZE
sent: int = self.send(mv[:max_send_size])
try:
sent: int = self.send(mv[:max_send_size])
except BlockingIOError:
logger.warning('BlockingIOError when trying send to {0}'.format(self.tag))
return 0
if sent == len(mv):
self.buffer.pop(0)
self._num_buffer -= 1
Expand Down
29 changes: 17 additions & 12 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ def __init__(self, *args: Any, **kwargs: Any):
if not self.flags.threadless:
self.selector = selectors.DefaultSelector()
self.plugin: Optional[HttpProtocolHandlerPlugin] = None
self.writes_teared: bool = False
self.reads_teared: bool = False

##
# initialize, is_inactive, shutdown, get_events, handle_events
Expand Down Expand Up @@ -137,23 +139,26 @@ async def handle_events(
) -> bool:
"""Returns True if proxy must tear down."""
# Flush buffer for ready to write sockets
teardown = await self.handle_writables(writables)
if teardown:
self.writes_teared = await self.handle_writables(writables)
if self.writes_teared:
return True
# Invoke plugin.write_to_descriptors
if self.plugin:
teardown = await self.plugin.write_to_descriptors(writables)
if teardown:
self.writes_teared = await self.plugin.write_to_descriptors(writables)
if self.writes_teared:
return True
# Read from ready to read sockets
teardown = await self.handle_readables(readables)
if teardown:
# Read from ready to read sockets if reads have not already teared down
if not self.reads_teared:
self.reads_teared = await self.handle_readables(readables)
if not self.reads_teared:
# Invoke plugin.read_from_descriptors
if self.plugin:
self.reads_teared = await self.plugin.read_from_descriptors(
readables,
)
# Wait until client buffer has flushed when reads has teared down but we can still write
if self.reads_teared and not self.work.has_buffer():
return True
# Invoke plugin.read_from_descriptors
if self.plugin:
teardown = await self.plugin.read_from_descriptors(readables)
if teardown:
return True
return False

def handle_data(self, data: memoryview) -> Optional[bool]:
Expand Down
12 changes: 8 additions & 4 deletions proxy/http/proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ async def get_descriptors(self) -> Descriptors:
return r, w

async def write_to_descriptors(self, w: Writables) -> bool:
if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream:
if (
self.upstream
and not self.upstream.closed
and self.upstream.connection.fileno() not in w
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
# doesn't contain the descriptor they registered.
Expand Down Expand Up @@ -208,9 +212,9 @@ async def write_to_descriptors(self, w: Writables) -> bool:

async def read_from_descriptors(self, r: Readables) -> bool:
if (
self.upstream and not
self.upstream.closed and
self.upstream.connection.fileno() not in r
self.upstream
and not self.upstream.closed
and self.upstream.connection.fileno() not in r
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
Expand Down

0 comments on commit 5ab40f2

Please sign in to comment.