Skip to content

Commit

Permalink
🐛 send StatusUpdateEvent after connected & 🔊 better logger & 🚨 make r…
Browse files Browse the repository at this point in the history
…uff happy
  • Loading branch information
j1g5awi committed Aug 21, 2024
1 parent 4345dd1 commit cdf0602
Show file tree
Hide file tree
Showing 7 changed files with 110 additions and 67 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
[![Pydantic v2](https://img.shields.io/endpoint?url=https://raw.githubusercontent.com/pydantic/pydantic/main/docs/badge/v2.json)](https://pydantic.dev)
[![PyPI Version](https://img.shields.io/pypi/v/nonebot-plugin-all4one.svg?style=flat-square)](https://pypi.python.org/pypi/nonebot-plugin-all4one)
[![OneBot 4A](https://img.shields.io/badge/OneBot-4A-black?style=flat-square)](https://onebot4all.vercel.app/)
[![NoneBot Version](https://img.shields.io/badge/nonebot-2.3.0+-red.svg?style=flat-square)](https://v2.nonebot.dev/)
[![NoneBot Version](https://img.shields.io/badge/nonebot-2.3.0+-red.svg?style=flat-square)](https://v2.nonebot.dev/)
[![NoneBot Registry](https://img.shields.io/endpoint?url=https%3A%2F%2Fnbbdg.lgc2333.top%2Fplugin%2Fnonebot-plugin-all4one)](https://registry.nonebot.dev/plugin/nonebot-plugin-all4one:nonebot_plugin_all4one)
[![Supported Adapters](https://img.shields.io/endpoint?url=https%3A%2F%2Fnbbdg.lgc2333.top%2Fplugin-adapters%2Fnonebot-plugin-all4one)](https://registry.nonebot.dev/plugin/nonebot-plugin-all4one:nonebot_plugin_all4one)

Expand Down
3 changes: 3 additions & 0 deletions nonebot_plugin_all4one/__version__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import importlib.metadata as importlib_metadata

__version__ = importlib_metadata.version(__package__ or "nonebot-plugin-all4one")
3 changes: 3 additions & 0 deletions nonebot_plugin_all4one/logger.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from nonebot.utils import logger_wrapper

log = logger_wrapper("041")
5 changes: 2 additions & 3 deletions nonebot_plugin_all4one/middlewares/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
from pathlib import Path
from typing import Optional, cast

from nonebot.log import logger

from ..logger import log
from .base import Middleware

MIDDLEWARE_MAP = {}
Expand All @@ -23,4 +22,4 @@
):
MIDDLEWARE_MAP[middleware.get_name()] = middleware
except Exception:
logger.warning(f"Failed to import {module_name}")
log("WARNING", f"Failed to import {module_name}")
5 changes: 3 additions & 2 deletions nonebot_plugin_all4one/middlewares/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ async def get_bot_self(self) -> BotSelf:
user_id=self.self_id,
**{
"supported_actions": await self.get_supported_actions(),
"supported_message_segments": await self.get_supported_message_segments(),
"supported_message_segments": await self.get_supported_message_segments(), # noqa: E501
},
)

Expand Down Expand Up @@ -100,7 +100,8 @@ async def send_message(
"""发送消息
参数:
detail_type: 发送的类型,可以为 private、group 或扩展的类型,和消息事件的 detail_type 字段对应
detail_type: 发送的类型,可以为 private、group 或扩展的类型,
和消息事件的 detail_type 字段对应
user_id: 用户 ID,当 detail_type 为 private 时必须传入
group_id: 群 ID,当 detail_type 为 group 时必须传入
guild_id: Guild 群组 ID,当 detail_type 为 channel 时必须传入
Expand Down
6 changes: 0 additions & 6 deletions nonebot_plugin_all4one/middlewares/onebot_v11.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ def get_platform(self):
return "qq"

async def to_onebot_event(self, event: Event) -> list[OneBotEvent]:
replaced_by_detail_type = (
"notice_type",
"message_type",
"request_type",
"meta_event_type",
)
should_be_str = (
"message_id",
"user_id",
Expand Down
153 changes: 98 additions & 55 deletions nonebot_plugin_all4one/onebotimpl/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@
from functools import partial
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from typing import Any, Union, Literal, Optional, cast
from typing import Any, Union, Literal, ClassVar, Optional, cast
from asyncio import Task, Queue, sleep, gather, wait_for, create_task

import msgpack
from nonebot.log import logger
from nonebot.adapters import Bot
from nonebot.utils import escape_tag
from nonebot.exception import WebSocketClosed
Expand Down Expand Up @@ -38,7 +37,9 @@
WebSocketServerSetup,
)

from ..logger import log
from .utils import encode_data
from ..__version__ import __version__
from ..middlewares import MIDDLEWARE_MAP, Middleware
from .config import (
Config,
Expand All @@ -50,6 +51,10 @@


class OneBotImplementation:
USER_AGENT: ClassVar[str] = f"OneBot4All NoneBot-Plugin-All4One/{__version__}"
ONEBOT_VERSION: ClassVar[str] = "A"
IMPL_NAME: ClassVar[str] = "nonebot-plugin-all4one"

def __init__(self, driver: Driver):
self.driver = driver
self.config = Config(**self.driver.config.model_dump())
Expand Down Expand Up @@ -89,14 +94,10 @@ def register_middleware(self, middleware: type[Middleware]):
"""注册一个中间件"""
name = middleware.get_name()
if name in self._middlewares:
logger.opt(colors=True).warning(
f'Middleware "<y>{escape_tag(name)}</y>" already exists'
)
log("WARNING", f'Middleware "<y>{escape_tag(name)}</y>" already exists')
return
self._middlewares[name] = middleware
logger.opt(colors=True).info(
f'Succeeded to load middleware "<y>{escape_tag(name)}</y>"'
)
log("INFO", f'Succeeded to load middleware "<y>{escape_tag(name)}</y>"')

async def _call_api(self, data: dict[str, Any]) -> Any:
try:
Expand Down Expand Up @@ -190,9 +191,9 @@ async def get_version(
kwargs: 扩展字段
"""
return {
"impl": "nonebot-plugin-all4one",
"version": "0.1.0",
"onebot_version": "12",
"impl": self.IMPL_NAME,
"version": __version__,
"onebot_version": self.ONEBOT_VERSION,
}

def _check_access_token(
Expand All @@ -215,20 +216,20 @@ async def _ws_send(
websocket: WebSocket,
conn: Union[WebsocketConfig, WebsocketReverseConfig],
) -> None:
queue = Queue()
queue = Queue[Event]()
self.queues.append(queue)
try:
while True:
event = await queue.get()
await websocket.send(encode_data(event.dict(), conn.use_msgpack))
await websocket.send(encode_data(event.model_dump(), conn.use_msgpack))
except WebSocketClosed:
logger.opt(colors=True).warning(
"<y><bg #f8bbd0>WebSocket Closed</bg #f8bbd0></y>"
)
except Exception:
logger.opt(colors=True).exception(
"<r><bg #f8bbd0>Error while process data from websocket"
". Trying to reconnect...</bg #f8bbd0></r>"
log("WARNING", "<y>WebSocket Closed</y>")
except Exception as e:
log(
"ERROR",
"<r>Error while process data from websocket. "
"Trying to reconnect...</r>",
e,
)
finally:
self.queues.remove(queue)
Expand All @@ -247,7 +248,7 @@ async def _ws_recv(self, websocket: WebSocket) -> None:
if "echo" in data:
echo = data["echo"]
resp = await self._call_api(data)
# 格式错误(包括实现不支持 MessagePack 的情况)、必要字段缺失或字段类型错误
# 格式错误(包括实现不支持 MessagePack 的情况)、必要字段缺失或类型错误
except (json.JSONDecodeError, msgpack.UnpackException):
resp = {
"status": "failed",
Expand All @@ -267,11 +268,13 @@ async def _ws_recv(self, websocket: WebSocket) -> None:
resp["echo"] = echo
await websocket.send(encode_data(resp, isinstance(raw_data, bytes)))
except WebSocketClosed:
logger.opt(colors=True).warning("WebSocket closed by peer")
log("WARNING", "WebSocket closed by peer")
# 与 WebSocket 服务器的连接发生了意料之外的错误
except Exception:
logger.opt(colors=True).exception(
"<r><bg #f8bbd0>Error while process data from websocket</bg #f8bbd0></r>"
except Exception as e:
log(
"ERROR",
"<r>Error while process data from websocket</r>",
e,
)

async def _handle_http(
Expand All @@ -283,7 +286,7 @@ async def _handle_http(
if response := self._check_access_token(request, conn.access_token):
return response

# 如果收到不支持的 Content-Type 请求头,必须返回 HTTP 状态码 415 Unsupported Media Type
# 如果收到不支持的 Content-Type 请求头,必须返回 HTTP 状态码 415
content_type = request.headers.get("Content-Type")
if content_type not in ("application/json", "application/msgpack"):
return Response(415, content="Invalid Content-Type")
Expand Down Expand Up @@ -341,6 +344,19 @@ async def _handle_ws(self, conn: WebsocketConfig, websocket: WebSocket) -> None:
conn.use_msgpack,
)
)
await websocket.send(
encode_data(
StatusUpdateMetaEvent(
id=uuid.uuid4().hex,
time=datetime.now(),
type="meta",
detail_type="status_update",
sub_type="",
status=await self.get_status(),
).model_dump(),
conn.use_msgpack,
)
)
t1 = create_task(self._ws_send(websocket, conn))
t2 = create_task(self._ws_recv(websocket))
await t2
Expand All @@ -351,22 +367,32 @@ async def _http_webhook(self, conn: HTTPWebhookConfig):
"Content-Type": (
"application/msgpack" if conn.use_msgpack else "application/json"
),
"User-Agent": "OneBot/12 NoneBot Plugin All4One/0.1.0",
"X-OneBot-Version": "12",
"X-Impl": "nonebot-plugin-all4one",
"User-Agent": self.USER_AGENT,
"X-OneBot-Version": self.ONEBOT_VERSION,
"X-Impl": self.IMPL_NAME,
}
if conn.access_token:
headers["Authorization"] = f"Bearer {conn.access_token}"
queue = Queue()
queue = Queue[Event]()
self.queues.append(queue)
await queue.put(
StatusUpdateMetaEvent(
id=uuid.uuid4().hex,
time=datetime.now(),
type="meta",
detail_type="status_update",
sub_type="",
status=await self.get_status(),
)
)
while True:
try:
event = await queue.get()
request = Request(
"POST",
str(conn.url),
headers=headers,
content=encode_data(event.dict(), conn.use_msgpack),
content=encode_data(event.model_dump(), conn.use_msgpack),
)
resp = await self.request(request)
if resp.status_code == 200:
Expand All @@ -380,32 +406,33 @@ async def _http_webhook(self, conn: HTTPWebhookConfig):
elif content_type == "application/json":
data = json.loads(resp.content)
else:
logger.error("Invalid Content-Type")
log("ERROR", "Invalid Content-Type")
continue
for action in data:
await self._call_api(action)
# 动作请求执行出错
except Exception:
logger.exception("HTTP Webhook Response action failed")
except Exception as e:
log("ERROR", "HTTP Webhook Response action failed", e)
# 事件推送成功,并不做更多处理
elif resp.status_code == 204:
pass
# 事件推送失败
else:
logger.error(f"HTTP Webhook event push failed: {resp}")
log("ERROR", f"HTTP Webhook event push failed: {resp}")
except (NotImplementedError, TypeError):
logger.error(
f"Current driver {self.driver.type} does not support http client"
log(
"ERROR",
f"Current driver {self.driver.type} does not support http client",
)
self.queues.remove(queue)
break
except Exception:
logger.exception("HTTP Webhook event push failed")
except Exception as e:
log("ERROR", "HTTP Webhook event push failed", e)

async def _websocket_rev(self, conn: WebsocketReverseConfig) -> None:
headers = {
"User-Agent": "OneBot/12 NoneBot Plugin All4One/0.1.0",
"Sec-WebSocket-Protocol": "12.nonebot-plugin-all4one",
"User-Agent": self.USER_AGENT,
"Sec-WebSocket-Protocol": f"{self.ONEBOT_VERSION}.{self.IMPL_NAME}",
}
if conn.access_token:
headers["Authorization"] = f"Bearer {conn.access_token}"
Expand All @@ -432,28 +459,44 @@ async def _websocket_rev(self, conn: WebsocketReverseConfig) -> None:
conn.use_msgpack,
)
)
await ws.send(
encode_data(
StatusUpdateMetaEvent(
id=uuid.uuid4().hex,
time=datetime.now(),
type="meta",
detail_type="status_update",
sub_type="",
status=await self.get_status(),
).model_dump(),
conn.use_msgpack,
)
)
t1 = create_task(self._ws_send(ws, conn))
t2 = create_task(self._ws_recv(ws))
await t2
t1.cancel()
except WebSocketClosed:
logger.opt(colors=True).warning(
"<y><bg #f8bbd0>WebSocket Closed</bg #f8bbd0></y>"
)
except Exception:
logger.opt(colors=True).exception(
"<r><bg #f8bbd0>Error while process data from websocket"
f"{escape_tag(str(conn.url))}. Trying to reconnect...</bg #f8bbd0></r>",
log("WARNING", "<y>WebSocket Closed</y>")
except Exception as e:
log(
"ERROR",
"<r>Error while process data from websocket"
f"{escape_tag(str(conn.url))}. Trying to reconnect...</r>",
e,
)
except (NotImplementedError, TypeError):
logger.error(
f"Current driver {self.driver.type} does not support websocket server"
log(
"ERROR",
f"Current driver {self.driver.type} "
"does not support websocket server",
)
break
except Exception:
logger.opt(colors=True).warning(
"<y><bg #f8bbd0>Error while setup websocket to "
f"{escape_tag(str(conn.url))}. Trying to reconnect...</bg #f8bbd0></y>",
log(
"WARNING",
"<y>Error while setup websocket to "
f"{escape_tag(str(conn.url))}. Trying to reconnect...</y>",
)
await sleep(conn.reconnect_interval)

Expand Down Expand Up @@ -502,7 +545,7 @@ def _register_middlewares(self, middlewares: Optional[set[str]] = None):
if middleware in MIDDLEWARE_MAP:
self.register_middleware(MIDDLEWARE_MAP[middleware])
else:
logger.error(f"Can not find middleware for Adapter {middleware}")
log("ERROR", f"Can not find middleware for Adapter {middleware}")

def setup(self):
@self.driver.on_startup
Expand All @@ -512,7 +555,7 @@ async def _():
if isinstance(conn, HTTPConfig):
queue = None
if conn.event_enabled:
queue = Queue(conn.event_buffer_size)
queue = Queue[Event](conn.event_buffer_size)
self.setup_http_server(
HTTPServerSetup(
URL("/all4one/"),
Expand Down

0 comments on commit cdf0602

Please sign in to comment.