Skip to content

Commit

Permalink
Wait until buffer flush
Browse files Browse the repository at this point in the history
  • Loading branch information
abhinavsingh committed Apr 13, 2024
1 parent 68e1c41 commit 5fd393c
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 34 deletions.
23 changes: 14 additions & 9 deletions helper/monitor_open_files.sh
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@ if [[ -z "$PROXY_PY_PID" ]]; then
exit 1
fi

OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l)
echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN"
while true;
do
OPEN_FILES_BY_MAIN=$(lsof -p "$PROXY_PY_PID" | wc -l)
echo "[$PROXY_PY_PID] Main process: $OPEN_FILES_BY_MAIN"

pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l)
echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR"
pgrep -P "$PROXY_PY_PID" | while read -r acceptorPid; do
OPEN_FILES_BY_ACCEPTOR=$(lsof -p "$acceptorPid" | wc -l)
echo "[$acceptorPid] Acceptor process: $OPEN_FILES_BY_ACCEPTOR"

pgrep -P "$acceptorPid" | while read -r childPid; do
OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l)
echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC"
pgrep -P "$acceptorPid" | while read -r childPid; do
OPEN_FILES_BY_CHILD_PROC=$(lsof -p "$childPid" | wc -l)
echo " [$childPid] child process: $OPEN_FILES_BY_CHILD_PROC"
done
done
done

sleep 1
done
43 changes: 22 additions & 21 deletions proxy/http/handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ class HttpProtocolHandler(BaseTcpServerHandler[HttpClientConnection]):
Accepts `Client` connection and delegates to HttpProtocolHandlerPlugin.
"""

server_teared = False

def __init__(self, *args: Any, **kwargs: Any):
super().__init__(*args, **kwargs)
self.start_time: float = time.time()
Expand Down Expand Up @@ -131,6 +129,9 @@ async def get_events(self) -> SelectableEvents:
events[wfileno] |= selectors.EVENT_WRITE
return events

writes_teardown: bool = False
reads_teardown: bool = False

# We override super().handle_events and never call it
async def handle_events(
self,
Expand All @@ -139,26 +140,26 @@ async def handle_events(
) -> bool:
"""Returns True if proxy must tear down."""
# Flush buffer for ready to write sockets
teardown = await self.handle_writables(writables)
if teardown:
return True
# Invoke plugin.write_to_descriptors
if self.plugin:
teardown = await self.plugin.write_to_descriptors(writables)
if teardown:
return True
if not self.server_teared:
# Read from ready to read sockets
teardown = await self.handle_readables(readables)
if teardown:
self.server_teared = True
# Invoke plugin.read_from_descriptors
self.writes_teardown = await self.handle_writables(writables)
if not self.writes_teardown:
# Invoke plugin.write_to_descriptors
if self.plugin:
teardown = await self.plugin.read_from_descriptors(readables)
if teardown:
self.server_teared = True
if self.server_teared and not self.work.has_buffer():
# Wait until client buffer flushed when server teared down.
self.writes_teardown = await self.plugin.write_to_descriptors(writables)
if not self.writes_teardown:
# Read from ready to read sockets
self.reads_teardown = await self.handle_readables(readables)
if not self.reads_teardown:
# Invoke plugin.read_from_descriptors
if self.plugin:
self.reads_teardown = await self.plugin.read_from_descriptors(
readables
)
# Wait until client buffer has flushed when reads has teared down, but we can still write
if (
self.reads_teardown
and not self.writes_teardown
and not self.work.has_buffer()
):
return True
return False

Expand Down
12 changes: 8 additions & 4 deletions proxy/http/proxy/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,11 @@ async def get_descriptors(self) -> Descriptors:
return r, w

async def write_to_descriptors(self, w: Writables) -> bool:
if (self.upstream and self.upstream.connection.fileno() not in w) or not self.upstream:
if (
self.upstream
and not self.upstream.closed
and self.upstream.connection.fileno() not in w
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
# doesn't contain the descriptor they registered.
Expand Down Expand Up @@ -208,9 +212,9 @@ async def write_to_descriptors(self, w: Writables) -> bool:

async def read_from_descriptors(self, r: Readables) -> bool:
if (
self.upstream and not
self.upstream.closed and
self.upstream.connection.fileno() not in r
self.upstream
and not self.upstream.closed
and self.upstream.connection.fileno() not in r
) or not self.upstream:
# Currently, we just call write/read block of each plugins. It is
# plugins responsibility to ignore this callback, if passed descriptors
Expand Down

0 comments on commit 5fd393c

Please sign in to comment.