-
Notifications
You must be signed in to change notification settings - Fork 4
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
156 additions
and
62 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,23 +1,20 @@ | ||
from nonebot.adapters.onebot.v11 import ( | ||
Bot, | ||
Message, | ||
MessageEvent, | ||
MessageSegment, | ||
GroupMessageEvent | ||
) | ||
from random import choice | ||
from typing import Annotated | ||
|
||
from nonebot import get_bot | ||
from nonebot.adapters.onebot.v11 import MessageSegment | ||
from nonebot.params import RegexGroup, ArgPlainText, CommandArg, CommandStart | ||
from nonebot.plugin import on_regex, on_command, require, PluginMetadata | ||
from nonebot.params import RegexGroup, ArgPlainText, CommandArg | ||
from nonebot.log import logger | ||
from nonebot import get_driver, get_bot | ||
from nonebot.matcher import Matcher | ||
from nonebot.rule import to_me | ||
from nonebot.exception import ActionFailed | ||
from typing import Tuple, Any | ||
from random import choice | ||
from datetime import datetime | ||
|
||
from .config import Config | ||
from .utils import * | ||
from .hoyospider import * | ||
from .utils import * | ||
|
||
require("nonebot_plugin_templates") | ||
from nonebot_plugin_templates.template_types import * | ||
from nonebot_plugin_templates.templates_render import menu_render | ||
|
||
try: | ||
scheduler = require("nonebot_plugin_apscheduler").scheduler | ||
except: | ||
|
@@ -43,7 +40,7 @@ | |
"example": "保存cos:保存cos图片至本地文件", | ||
"author": "divandia <[email protected]>", | ||
"version": "0.2.0", | ||
} | ||
}, | ||
) | ||
logo = """<g> | ||
/$$$$$$ /$$ /$$ /$$$$$$ | ||
|
@@ -56,12 +53,11 @@ | |
\______/ \_______/|__/ |__/|_______/ |__/ |__/|__/|__/ |__/ \______/ \______/ |_______/ | ||
</g>""" | ||
|
||
|
||
logger.opt(colors=True).info(logo) | ||
|
||
#用户cd数据 | ||
# 用户cd数据 | ||
user_data = {} | ||
CONFIG: Dict[str, Dict[str,str]] = {'原神':{},'崩坏3':{},'大别野':{},'星穹铁道':{}} | ||
CONFIG: Dict[str, Dict[str, str]] = {"原神": {}, "崩坏3": {}, "大别野": {}, "星穹铁道": {}} | ||
DRIVER = get_driver() | ||
|
||
# 读取配置文件 | ||
|
@@ -74,20 +70,73 @@ | |
with open(config_path, "w", encoding="utf8") as f: | ||
json.dump(CONFIG, f, ensure_ascii=False, indent=4) | ||
|
||
#事件响应器 | ||
download_cos = on_command('下载cos', aliases={'cos保存', '保存cos'}, block=False, priority=5, permission=SUPER_PERMISSION) | ||
hot_cos = on_command('热门cos', aliases={'热门coser', '热门cos图'}, block=False, priority=5) | ||
rank_cos = on_regex(r'^(日|月|周)榜cos[r]?[图]?(.+)?',priority=5, block=False, flags=re.I) | ||
latest_cos = on_command('最新cos', aliases={'最新coser', '最新cos图'}, block=False, priority=5) | ||
good_cos = on_command('精品cos', aliases={'精品coser', '精品cos图'}, block=False, priority=5) | ||
turn_aps = on_regex(r'^(开启|关闭)每日推送(原神|崩坏3|星穹铁道|大别野)(\s)?(.+)?', block=False, priority=5, flags=re.I, permission=SUPER_PERMISSION) | ||
show_aps = on_command('查看本群推送', aliases={'查看推送','查看订阅'}, block=False, priority=5, rule=to_me()) | ||
# 事件响应器 | ||
help = on_command("原神cos", aliases={"genshincos"}) | ||
download_cos = on_command( | ||
"下载cos", | ||
aliases={"cos保存", "保存cos"}, | ||
block=False, | ||
priority=5, | ||
permission=SUPER_PERMISSION, | ||
) | ||
hot_cos = on_command("热门cos", aliases={"热门coser", "热门cos图"}, block=False, priority=5) | ||
rank_cos = on_regex(r"^(日|月|周)榜cos[r]?[图]?(.+)?", priority=5, block=False, flags=re.I) | ||
latest_cos = on_command("最新cos", aliases={"最新coser", "最新cos图"}, block=False, priority=5) | ||
good_cos = on_command("精品cos", aliases={"精品coser", "精品cos图"}, block=False, priority=5) | ||
turn_aps = on_regex( | ||
r"^(开启|关闭)每日推送(原神|崩坏3|星穹铁道|大别野)(\s)?(.+)?", | ||
block=False, | ||
priority=5, | ||
flags=re.I, | ||
permission=SUPER_PERMISSION, | ||
) | ||
show_aps = on_command( | ||
"查看本群推送", aliases={"查看推送", "查看订阅"}, block=False, priority=5, rule=to_me() | ||
) | ||
|
||
Generated_pic = False | ||
Help_Path = Path(__file__).parent / "Help.jpeg" | ||
|
||
|
||
@help.handle() | ||
async def _(matcher: Matcher, event: MessageEvent, foo: Annotated[str, CommandStart()]): | ||
global Generated_pic | ||
command_start = str(foo) | ||
if not Generated_pic: | ||
menu = Menu( | ||
"原神cos", | ||
des="获取原神cos图片", | ||
funcs=Funcs( | ||
Func(f"{command_start}日、周、月榜cos", "获取排行榜cos图。如日榜cos 原神 x3") | ||
+ Func(f"{command_start}热门cos", "获取指定游戏热门cos图,如热门cos 原神 x3 ") | ||
+ Func(f"{command_start}最新cos", "获取指定游戏最新cos图,如最新cos 原神 x3 ") | ||
+ Func(f"{command_start}精品cos", "获取指定游戏精品cos图,如精品cos 原神 x3 ") | ||
+ Func(f"{command_start}查看本群推送", "查看本群的订阅cos目录") | ||
+ Func( | ||
f"{command_start}下载cos", "仅超管、群主、管理员可用\n获取指定游戏热门cos图,如热门cos 原神 x3 " | ||
) | ||
+ Func( | ||
f"{command_start}开启每日推送xx (时间)", | ||
"仅超管、群主、管理员可用\n如开启每日推送原神 8:30,注意时间的格式", | ||
) | ||
), | ||
) | ||
pic_bytes = await menu_render(Menus(menu), 700) | ||
with open(Help_Path, "wb") as f: | ||
f.write(pic_bytes) | ||
Generated_pic = True | ||
else: | ||
with open(Help_Path, "rb") as f: | ||
pic_bytes = f.read() | ||
await matcher.send(MessageSegment.image(pic_bytes)) | ||
|
||
|
||
@show_aps.handle() | ||
async def _(bot: Bot, event: GroupMessageEvent): | ||
send_msg = '本群订阅的推送有:\n' | ||
send_msg = "本群订阅的推送有:\n" | ||
for game_type, dict in CONFIG.items(): | ||
if game_type == "": continue | ||
if game_type == "": | ||
continue | ||
for goup_id, time in dict.items(): | ||
if str(event.group_id) == goup_id: | ||
send_msg += f"{game_type}的每日{time}推送\n" | ||
|
@@ -99,7 +148,7 @@ async def _(event: GroupMessageEvent, args: Tuple[str, ...] = RegexGroup()): | |
# 检查是否安装了apscheduler插件,并且是否开启了定时推送 | ||
if scheduler == None: | ||
await turn_aps.finish("未安装apscheduler插件,无法使用此功能") | ||
mode = args[0] | ||
mode = args[0] | ||
game_type = args[1] | ||
time = args[3] | ||
aps_group_id = str(event.group_id) | ||
|
@@ -114,7 +163,14 @@ async def _(event: GroupMessageEvent, args: Tuple[str, ...] = RegexGroup()): | |
else: | ||
CONFIG[name][aps_group_id] = time | ||
try: | ||
scheduler.add_job(aps_send, trigger="cron", hour=time.split(":")[0], minute=time.split(":")[1], id=f"{game_type}{aps_group_id}",args=(aps_group_id,)) | ||
scheduler.add_job( | ||
aps_send, | ||
trigger="cron", | ||
hour=time.split(":")[0], | ||
minute=time.split(":")[1], | ||
id=f"{game_type}{aps_group_id}", | ||
args=(aps_group_id,), | ||
) | ||
logger.debug(f"已成功添加{aps_group_id}的{game_type}定时推送") | ||
except Exception as e: | ||
logger.error(e) | ||
|
@@ -136,7 +192,9 @@ async def _(event: GroupMessageEvent, args: Tuple[str, ...] = RegexGroup()): | |
|
||
|
||
@hot_cos.handle() | ||
async def _(bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = CommandArg()): | ||
async def _( | ||
bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = CommandArg() | ||
): | ||
if not arg: | ||
await hot_cos.finish("请指定cos类型") | ||
args = arg.extract_plain_text().split() | ||
|
@@ -150,10 +208,16 @@ async def _(bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = Comm | |
send_type = starrail_hot | ||
else: | ||
await hot_cos.finish("暂不支持该类型") | ||
await send_images(bot,matcher,args,event,send_type) | ||
await send_images(bot, matcher, args, event, send_type) | ||
|
||
|
||
@rank_cos.handle() | ||
async def _(bot: Bot, matcher: Matcher, event: MessageEvent, group: Tuple[str, ...] = RegexGroup()): | ||
async def _( | ||
bot: Bot, | ||
matcher: Matcher, | ||
event: MessageEvent, | ||
group: Tuple[str, ...] = RegexGroup(), | ||
): | ||
if not group[1]: | ||
await rank_cos.finish("请指定cos类型") | ||
args = group[1].split() | ||
|
@@ -174,10 +238,13 @@ async def _(bot: Bot, matcher: Matcher, event: MessageEvent, group: Tuple[str, . | |
send_type = Rank(ForumType.StarRailCos, rank_type) | ||
else: | ||
await rank_cos.finish("暂不支持该类型") | ||
await send_images(bot,matcher,args,event,send_type) | ||
await send_images(bot, matcher, args, event, send_type) | ||
|
||
|
||
@latest_cos.handle() | ||
async def _(bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = CommandArg()): | ||
async def _( | ||
bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = CommandArg() | ||
): | ||
if not arg: | ||
await latest_cos.finish("请指定cos类型") | ||
args = arg.extract_plain_text().split() | ||
|
@@ -191,10 +258,13 @@ async def _(bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = Comm | |
send_type = starrail_latest_comment | ||
else: | ||
await latest_cos.finish("暂不支持该类型") | ||
await send_images(bot,matcher,args,event,send_type) | ||
await send_images(bot, matcher, args, event, send_type) | ||
|
||
|
||
@good_cos.handle() | ||
async def _(bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = CommandArg()): | ||
async def _( | ||
bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = CommandArg() | ||
): | ||
if not arg: | ||
await good_cos.finish("请指定cos类型") | ||
args = arg.extract_plain_text().split() | ||
|
@@ -208,10 +278,10 @@ async def _(bot: Bot, matcher: Matcher, event: MessageEvent, arg: Message = Comm | |
await good_cos.finish("星穹铁道暂不支持精品cos") | ||
else: | ||
await good_cos.finish("暂不支持该类型") | ||
await send_images(bot,matcher,args,event,send_type) | ||
await send_images(bot, matcher, args, event, send_type) | ||
|
||
|
||
@download_cos.got('game_type', prompt='你想下载哪种类型的,有原神和大别野,崩坏3,星穹铁道') | ||
@download_cos.got("game_type", prompt="你想下载哪种类型的,有原神和大别野,崩坏3,星穹铁道") | ||
async def got_type(game_type: str = ArgPlainText()): | ||
if game_type in GENSHIN_NAME: | ||
hot = genshin_hot | ||
|
@@ -223,26 +293,31 @@ async def got_type(game_type: str = ArgPlainText()): | |
hot = starrail_hot | ||
image_urls = await hot.async_get_urls() | ||
if not image_urls: | ||
await download_cos.finish(f'没有找到{game_type}的cos图片') | ||
await download_cos.finish(f"没有找到{game_type}的cos图片") | ||
else: | ||
await download_cos.send(f'正在下载{game_type}的cos图片') | ||
await download_cos.send(f"正在下载{game_type}的cos图片") | ||
try: | ||
await download_from_urls(image_urls, SAVE_PATH / f'{game_type}cos') | ||
await download_cos.finish(f'已成功保存{len(image_urls)}张{game_type}的cos图片') | ||
await download_from_urls(image_urls, SAVE_PATH / f"{game_type}cos") | ||
await download_cos.finish(f"已成功保存{len(image_urls)}张{game_type}的cos图片") | ||
except WriteError as e: | ||
await download_cos.finish(f'保存部分{game_type}的cos图片失败,原因:{e}') | ||
await download_cos.finish(f"保存部分{game_type}的cos图片失败,原因:{e}") | ||
|
||
|
||
########################################################################################### | ||
|
||
#定时任务 | ||
|
||
# 定时任务 | ||
async def aps_send(aps_goup_id: str): | ||
logger.debug("正在发送定时推送") | ||
bot: Bot = get_bot() | ||
for game_type, dict in CONFIG.items(): | ||
if game_type == "": | ||
continue | ||
for saved_group_id, time in dict.items(): | ||
if not (datetime.now().hour == int(time.split(":")[0]) and datetime.now().minute == int(time.split(":")[1])): | ||
if not ( | ||
datetime.now().hour == int(time.split(":")[0]) | ||
and datetime.now().minute == int(time.split(":")[1]) | ||
): | ||
continue | ||
elif saved_group_id != aps_goup_id: | ||
continue | ||
|
@@ -260,19 +335,29 @@ async def aps_send(aps_goup_id: str): | |
continue | ||
image_list = await send_type.async_get_urls(page_size=5) | ||
name_list = await send_type.async_get_name(page_size=5) | ||
rank_text = '\n'.join([f"{i+1}.{name_list[i]}" for i in range(len(name_list))]) | ||
rank_text = "\n".join( | ||
[f"{i + 1}.{name_list[i]}" for i in range(len(name_list))] | ||
) | ||
msg_list = [f"✅米游社{game_type}cos每日榜单✅"] | ||
msg_list.append(rank_text) | ||
msg_list.append([MessageSegment.image(img) for img in image_list]) | ||
msg_list = msglist2forward('米游社cos', '2854196306', msg_list) | ||
await bot.call_api("send_group_forward_msg", group_id=group_id, messages=msg_list) | ||
msg_list = msglist2forward("米游社cos", "2854196306", msg_list) | ||
await bot.call_api( | ||
"send_group_forward_msg", group_id=group_id, messages=msg_list | ||
) | ||
await asyncio.sleep(1) | ||
except Exception as e: | ||
logger.error(e) | ||
|
||
|
||
async def send_images(bot:Bot, matcher: Matcher, args: list, event: MessageEvent, send_type: HoyoBasicSpider): | ||
''' | ||
async def send_images( | ||
bot: Bot, | ||
matcher: Matcher, | ||
args: list, | ||
event: MessageEvent, | ||
send_type: HoyoBasicSpider, | ||
): | ||
""" | ||
发送图片 | ||
params: | ||
|
@@ -281,11 +366,11 @@ async def send_images(bot:Bot, matcher: Matcher, args: list, event: MessageEvent | |
args: 命令参数(0:类型 1:数量) | ||
event: 消息事件类型 | ||
send_type: 爬虫类型 | ||
''' | ||
""" | ||
global user_data | ||
out_cd, deletime, user_data = check_cd(event.user_id, user_data) | ||
if out_cd: | ||
if len(args)<2: | ||
if len(args) < 2: | ||
await matcher.send("获取图片中…请稍等") | ||
try: | ||
image_list = await send_type.async_get_urls() | ||
|
@@ -295,7 +380,7 @@ async def send_images(bot:Bot, matcher: Matcher, args: list, event: MessageEvent | |
else: | ||
num = int(re.sub(r"[x|*|X]", "", args[1])) | ||
num = num if num <= MAX else MAX | ||
msg_list = [f'✅找到最新的一些{args[0]}图如下:✅'] | ||
msg_list = [f"✅找到最新的一些{args[0]}图如下:✅"] | ||
image_list = await send_type.async_get_urls() | ||
if num > len(image_list): | ||
await matcher.finish(f"最多只能获取{len(image_list)}张图片", at_sender=True) | ||
|
@@ -308,6 +393,7 @@ async def send_images(bot:Bot, matcher: Matcher, args: list, event: MessageEvent | |
else: | ||
await matcher.finish(f"cd冷却中,还剩{deletime}秒", at_sender=True) | ||
|
||
|
||
@DRIVER.on_startup | ||
async def start_aps(): | ||
try: | ||
|
@@ -322,10 +408,17 @@ async def start_aps(): | |
if time == "": | ||
continue | ||
try: | ||
scheduler.add_job(aps_send, trigger="cron", hour=time.split(":")[0], minute=time.split(":")[1], id=f"{game_type}{aps_group_id}",args=(aps_group_id,)) | ||
scheduler.add_job( | ||
aps_send, | ||
trigger="cron", | ||
hour=time.split(":")[0], | ||
minute=time.split(":")[1], | ||
id=f"{game_type}{aps_group_id}", | ||
args=(aps_group_id,), | ||
) | ||
logger.debug(f"已成功添加{aps_group_id}的{game_type}定时推送") | ||
except Exception as e: | ||
logger.error(e) | ||
continue | ||
except Exception as e: | ||
logger.error(e) | ||
logger.error(e) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,18 +1,19 @@ | ||
[tool.poetry] | ||
name = "nonebot-plugin-genshin-cos" | ||
version = "0.2.2" | ||
version = "0.2.3" | ||
description = "米游社原神cos图获取" | ||
authors = ["Cvandia <[email protected]>"] | ||
license = "MIT" | ||
readme = "README.md" | ||
packages = [{include = "nonebot_plugin_genshin_cos"}] | ||
packages = [{ include = "nonebot_plugin_genshin_cos" }] | ||
|
||
[tool.poetry.dependencies] | ||
nonebot-plugin-templates = "^0.1.3" | ||
nonebot-plugin-apscheduler = ">=0.1.3" | ||
nonebot-adapter-onebot = ">=2.0.0-beta.1" | ||
python = ">=3.8,<4" | ||
nonebot2 = ">=2.0.0-beta.1" | ||
nonebot-adapter-onebot = ">=2.0.0-beta.1" | ||
httpx = ">=0.19.0" | ||
nonebot-plugin-apscheduler= ">=0.1.3" | ||
aiofiles = ">=0.7.0" | ||
pydantic = ">=1.10.2" | ||
pathlib = ">=1.0.1" | ||
|