-
Notifications
You must be signed in to change notification settings - Fork 8
/
Copy pathsession.py
75 lines (62 loc) · 2.06 KB
/
session.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# encoding: utf-8
"""
@author: john
@contact: [email protected]
@file: session.py
@time: 2021/9/2 上午3:25
@desc:
"""
__all__ = ['Session']
from typing import Any
from types import SimpleNamespace
import asyncio
from urllib.parse import urljoin
import logging
from aiohttp import ClientSession, ClientResponse
from aiohttp.tracing import TraceConfig as TraceConfig
from aiohttp.tracing import TraceRequestStartParams, TraceRequestEndParams
logger = logging.getLogger()
async def on_request_start(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestStartParams):
msg = f'''
Start request url {params.url}:
request method: {params.method}
request headers:\n {params.headers}
'''
logger.info(msg)
async def on_request_end(
session: ClientSession,
trace_config_ctx: SimpleNamespace,
params: TraceRequestEndParams
):
resp = params.response
text = await resp.text()
msg = f'''
End request url {params.url}:
response http code: {resp.status}
response headers:\n {resp.headers}
response body:\n {text}
'''
logger.info(msg)
class Session:
def __init__(self, base_url: str, debug: bool = True, **kwargs: Any) -> None:
self.base_url = base_url
trace_configs = []
if debug:
trace_config = TraceConfig()
trace_config.on_request_start.append(on_request_start)
trace_config.on_request_end.append(on_request_end)
trace_configs.append(trace_config)
self._session = ClientSession(trace_configs=trace_configs, **kwargs)
async def request(self, method: str, path: str, **kwargs: Any) -> ClientResponse:
url = urljoin(self.base_url, path)
async with self._session.request(
method, url, **kwargs) as resp:
return resp
async def close(self) -> None:
await asyncio.sleep(0) # Graceful Shutdown
await self._session.close()
def update_headers(self, headers: dict) -> None:
self._session.headers.update(headers)