diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md new file mode 100644 index 00000000..03b23f22 --- /dev/null +++ b/.github/pull_request_template.md @@ -0,0 +1,20 @@ +# 概述 + +**简要概述修改或更新的内容** + + +# 相关Issue +**与提交代码相关的Issue链接** + + +## 更新类型 + +- [x] BUG修复 (不影响项目结构和其他代码的小型BUG修复) +- [x] 新增功能 (不影响项目结构和其他代码的功能新增) +- [x] 重大修改 (可能影响项目结构或其他代码的重大变更) +- [x] 包含文档变更 + +## 其他备注 + +**其他需要注意的内容** + diff --git a/api/base.py b/api/base.py index 7c39dfcf..214d253a 100644 --- a/api/base.py +++ b/api/base.py @@ -1,106 +1,150 @@ # -*- coding: utf-8 -*- +""" +学习通API基础模块 +包含账号登录、课程获取、任务点处理等基础功能 +""" import re import time import random import requests from hashlib import md5 from requests.adapters import HTTPAdapter +from typing import Dict, List, Optional, Union, Any from api.cipher import AESCipher from api.logger import logger from api.cookies import save_cookies, use_cookies from api.process import show_progress from api.config import GlobalConst as gc -from api.decode import (decode_course_list, - decode_course_point, - decode_course_card, - decode_course_folder, - decode_questions_info - ) +from api.decode import ( + decode_course_list, + decode_course_point, + decode_course_card, + decode_course_folder, + decode_questions_info +) from api.answer import * -def get_timestamp(): + +def get_timestamp() -> str: + """获取毫秒级时间戳""" return str(int(time.time() * 1000)) -def get_random_seconds(): +def get_random_seconds() -> int: + """获取随机等待时间(30-90秒)""" return random.randint(30, 90) -def init_session(isVideo: bool = False, isAudio: bool = False): - _session = requests.session() - _session.verify = False - _session.mount('http://', HTTPAdapter(max_retries=3)) - _session.mount('https://', HTTPAdapter(max_retries=3)) - if isVideo: - _session.headers = gc.VIDEO_HEADERS - elif isAudio: - _session.headers = gc.AUDIO_HEADERS +def init_session(is_video: bool = False, is_audio: bool = False) -> requests.Session: + """ + 初始化requests会话 + + Args: + is_video: 是否为视频请求 + is_audio: 是否为音频请求 + + Returns: + 配置好的requests.Session对象 + """ + session = requests.session() + session.verify = False + # 配置重试机制 + adapter = HTTPAdapter(max_retries=3) + session.mount('http://', adapter) + session.mount('https://', adapter) + + # 根据请求类型设置对应headers + if is_video: + session.headers = gc.VIDEO_HEADERS + elif is_audio: + session.headers = gc.AUDIO_HEADERS else: - _session.headers = gc.HEADERS - _session.cookies.update(use_cookies()) - return _session + session.headers = gc.HEADERS + + session.cookies.update(use_cookies()) + return session class Account: - username = None - password = None - last_login = None - isSuccess = None - def __init__(self, _username, _password): - self.username = _username - self.password = _password + """账号信息类""" + def __init__(self, username: str, password: str): + self.username = username + self.password = password + self.last_login = None + self.is_success = None class Chaoxing: - def __init__(self, account: Account = None,tiku:Tiku=None): + """超星学习通API封装类""" + + def __init__(self, account: Optional[Account] = None, tiku: Optional['Tiku'] = None): self.account = account self.cipher = AESCipher() self.tiku = tiku - def login(self): - _session = requests.session() - _session.verify = False - _url = "https://passport2.chaoxing.com/fanyalogin" - _data = {"fid": "-1", - "uname": self.cipher.encrypt(self.account.username), - "password": self.cipher.encrypt(self.account.password), - "refer": "https%3A%2F%2Fi.chaoxing.com", - "t": True, - "forbidotherlogin": 0, - "validate": "", - "doubleFactorLogin": 0, - "independentId": 0 - } + def login(self) -> Dict[str, Union[bool, str]]: + """ + 登录学习通 + + Returns: + 登录结果字典 {"status": bool, "msg": str} + """ + session = requests.session() + session.verify = False + url = "https://passport2.chaoxing.com/fanyalogin" + + data = { + "fid": "-1", + "uname": self.cipher.encrypt(self.account.username), + "password": self.cipher.encrypt(self.account.password), + "refer": "https%3A%2F%2Fi.chaoxing.com", + "t": True, + "forbidotherlogin": 0, + "validate": "", + "doubleFactorLogin": 0, + "independentId": 0 + } + logger.trace("正在尝试登录...") - resp = _session.post(_url, headers=gc.HEADERS, data=_data) - if resp and resp.json()["status"] == True: - save_cookies(_session) + resp = session.post(url, headers=gc.HEADERS, data=data) + + if resp and resp.json()["status"]: + save_cookies(session) logger.info("登录成功...") return {"status": True, "msg": "登录成功"} - else: - return {"status": False, "msg": str(resp.json()["msg2"])} + return {"status": False, "msg": str(resp.json()["msg2"])} - def get_fid(self): - _session = init_session() - return _session.cookies.get("fid") + def get_fid(self) -> str: + """获取fid""" + return init_session().cookies.get("fid") - def get_uid(self): - _session = init_session() - return _session.cookies.get("_uid") + def get_uid(self) -> str: + """获取用户ID""" + return init_session().cookies.get("_uid") - def get_course_list(self): - _session = init_session() - _url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" - _data = { + def get_course_list(self) -> List[Dict]: + """ + 获取课程列表 + + Returns: + 课程信息列表 + """ + session = init_session() + url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" + + # 基础请求数据 + data = { "courseType": 1, "courseFolderId": 0, "query": "", "superstarClass": 0 } + logger.trace("正在读取所有的课程列表...") - # 接口突然抽风,增加headers - _headers = { + + # 设置专用headers + headers = { "Host": "mooc2-ans.chaoxing.com", "sec-ch-ua-platform": "\"Windows\"", "X-Requested-With": "XMLHttpRequest", @@ -116,138 +160,240 @@ def get_course_list(self): "Referer": "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction?moocDomain=https://mooc1-1.chaoxing.com/mooc-ans", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" } - _resp = _session.post(_url,headers=_headers,data=_data) - # logger.trace(f"原始课程列表内容:\n{_resp.text}") + + # 获取主课程列表 + resp = session.post(url, headers=headers, data=data) logger.info("课程列表读取完毕...") - course_list = decode_course_list(_resp.text) + course_list = decode_course_list(resp.text) - _interaction_url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction" - _interaction_resp = _session.get(_interaction_url) - course_folder = decode_course_folder(_interaction_resp.text) + # 获取文件夹中的课程 + interaction_url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction" + interaction_resp = session.get(interaction_url) + course_folder = decode_course_folder(interaction_resp.text) + + # 遍历文件夹获取课程 for folder in course_folder: - _data = { - "courseType": 1, - "courseFolderId": folder["id"], - "query": "", - "superstarClass": 0 - } - _resp = _session.post(_url, data=_data) - course_list += decode_course_list(_resp.text) + folder_data = data.copy() + folder_data["courseFolderId"] = folder["id"] + resp = session.post(url, data=folder_data) + course_list.extend(decode_course_list(resp.text)) + return course_list - def get_course_point(self, _courseid, _clazzid, _cpi): - _session = init_session() - _url = f"https://mooc2-ans.chaoxing.com/mooc2-ans/mycourse/studentcourse?courseid={_courseid}&clazzid={_clazzid}&cpi={_cpi}&ut=s" + def get_course_point(self, courseid: str, clazzid: str, cpi: str) -> List[Dict]: + """ + 获取课程章节信息 + + Args: + courseid: 课程ID + clazzid: 班级ID + cpi: 课程参数 + + Returns: + 章节信息列表 + """ + session = init_session() + url = f"https://mooc2-ans.chaoxing.com/mooc2-ans/mycourse/studentcourse?courseid={courseid}&clazzid={clazzid}&cpi={cpi}&ut=s" + logger.trace("开始读取课程所有章节...") - _resp = _session.get(_url) - # logger.trace(f"原始章节列表内容:\n{_resp.text}") + resp = session.get(url) logger.info("课程章节读取成功...") - return decode_course_point(_resp.text) + + return decode_course_point(resp.text) - def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): - _session = init_session() + def get_job_list(self, clazzid: str, courseid: str, cpi: str, knowledgeid: str) -> tuple[List[Dict], Dict]: + """ + 获取章节任务点列表 + + Args: + clazzid: 班级ID + courseid: 课程ID + cpi: 课程参数 + knowledgeid: 知识点ID + + Returns: + (任务点列表, 任务点信息) + """ + session = init_session() job_list = [] job_info = {} - for _possible_num in ["0", "1","2"]: # 学习界面任务卡片数,很少有3个的,但是对于章节解锁任务点少一个都不行,可以从API /mooc-ans/mycourse/studentstudyAjax获取值,或者干脆直接加,但二者都会造成额外的请求 - _url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={_clazzid}&courseid={_courseid}&knowledgeid={_knowledgeid}&num={_possible_num}&ut=s&cpi={_cpi}&v=20160407-3&mooc2=1" + + # 遍历可能的任务卡片数量 + for num in ["0", "1", "2"]: + url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={clazzid}&courseid={courseid}&knowledgeid={knowledgeid}&num={num}&ut=s&cpi={cpi}&v=20160407-3&mooc2=1" + logger.trace("开始读取章节所有任务点...") - _resp = _session.get(_url) - _job_list, _job_info = decode_course_card(_resp.text) - if _job_info.get('notOpen',False): - # 直接返回,节省一次请求 + resp = session.get(url) + tasks, info = decode_course_card(resp.text) + + # 检查章节是否开放 + if info.get('notOpen', False): logger.info("该章节未开放") - return [], _job_info - job_list += _job_list - job_info.update(_job_info) - # if _job_list and len(_job_list) != 0: - # break - # logger.trace(f"原始任务点列表内容:\n{_resp.text}") + return [], info + + job_list.extend(tasks) + job_info.update(info) + logger.info("章节任务点读取成功...") return job_list, job_info - def get_enc(self, clazzId, jobid, objectId, playingTime, duration, userid): - return md5( - f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]" - .encode()).hexdigest() + def get_enc(self, clazz_id: str, jobid: str, object_id: str, playing_time: int, + duration: int, userid: str) -> str: + """ + 生成视频播放加密参数 + """ + raw = f"[{clazz_id}][{userid}][{jobid}][{object_id}][{playing_time * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]" + return md5(raw.encode()).hexdigest() - def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type: str = "Video"): - if "courseId" in _job['otherinfo']: - _mid_text = f"otherInfo={_job['otherinfo']}&" - else: - _mid_text = f"otherInfo={_job['otherinfo']}&courseId={_course['courseId']}&" - _success = False - for _possible_rt in ["0.9", "1"]: - _url = (f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/" - f"{_course['cpi']}/" - f"{_dtoken}?" - f"clazzId={_course['clazzId']}&" - f"playingTime={_playingTime}&" - f"duration={_duration}&" - f"clipTime=0_{_duration}&" - f"objectId={_job['objectid']}&" - f"{_mid_text}" - f"jobid={_job['jobid']}&" - f"userid={self.get_uid()}&" - f"isdrag=3&" - f"view=pc&" - f"enc={self.get_enc(_course['clazzId'], _job['jobid'], _job['objectid'], _playingTime, _duration, self.get_uid())}&" - f"rt={_possible_rt}&" - f"dtype={_type}&" - f"_t={get_timestamp()}") - resp = _session.get(_url) + def video_progress_log(self, session: requests.Session, course: Dict, job: Dict, + job_info: Dict, dtoken: str, duration: int, playing_time: int, + type_: str = "Video") -> Union[Dict, bool]: + """ + 记录视频播放进度 + + Args: + session: 请求会话 + course: 课程信息 + job: 任务信息 + job_info: 任务详细信息 + dtoken: 播放token + duration: 视频总时长 + playing_time: 当前播放时间 + type_: 媒体类型(Video/Audio) + + Returns: + 响应结果或False(失败) + """ + # 构建otherInfo参数 + other_info = (f"otherInfo={job['otherinfo']}&courseId={course['courseId']}&" + if "courseId" not in job['otherinfo'] + else f"otherInfo={job['otherinfo']}&") + + success = False + # 尝试不同的rt参数 + for rt in ["0.9", "1"]: + url = ( + f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/{course['cpi']}/{dtoken}?" + f"clazzId={course['clazzId']}&" + f"playingTime={playing_time}&" + f"duration={duration}&" + f"clipTime=0_{duration}&" + f"objectId={job['objectid']}&" + f"{other_info}" + f"jobid={job['jobid']}&" + f"userid={self.get_uid()}&" + f"isdrag=3&" + f"view=pc&" + f"enc={self.get_enc(course['clazzId'], job['jobid'], job['objectid'], playing_time, duration, self.get_uid())}&" + f"rt={rt}&" + f"dtype={type_}&" + f"_t={get_timestamp()}" + ) + + resp = session.get(url) if resp.status_code == 200: - _success = True - break # 如果返回为200正常,则跳出循环 - elif resp.status_code == 403: - continue # 如果出现403无权限报错,则继续尝试不同的rt参数 - if _success: + success = True + break + + if success: return resp.json() - else: - # 若出现两个rt参数都返回403的情况,则跳过当前任务 - logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") - return False + + logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") + return False - def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video"): - if _type == "Video": - _session = init_session(isVideo=True) - else: - _session = init_session(isAudio=True) - _session.headers.update() - _info_url = f"https://mooc1.chaoxing.com/ananas/status/{_job['objectid']}?k={self.get_fid()}&flag=normal" - _video_info = _session.get(_info_url).json() - if _video_info["status"] == "success": - _dtoken = _video_info["dtoken"] - _duration = _video_info["duration"] - _crc = _video_info["crc"] - _key = _video_info["key"] - _isPassed = False - _isFinished = False - _playingTime = 0 - logger.info(f"开始任务: {_job['name']}, 总时长: {_duration}秒") - while not _isFinished: - if _isFinished: - _playingTime = _duration - _isPassed = self.video_progress_log(_session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type) - if not _isPassed or (_isPassed and _isPassed["isPassed"]): + def study_video(self, course: Dict, job: Dict, job_info: Dict, + speed: float = 1.0, type_: str = "Video") -> None: + """ + 学习视频/音频任务 + + Args: + course: 课程信息 + job: 任务信息 + job_info: 任务详细信息 + speed: 播放速度 + type_: 媒体类型(Video/Audio) + """ + # 初始化会话 + session = init_session(is_video=(type_=="Video"), is_audio=(type_=="Audio")) + + # 获取视频信息 + info_url = f"https://mooc1.chaoxing.com/ananas/status/{job['objectid']}?k={self.get_fid()}&flag=normal" + video_info = session.get(info_url).json() + + if "status" not in video_info: + logger.info("获取视频信息失败,跳过当前任务点...") + return None + + if video_info["status"] == "success": + dtoken = video_info["dtoken"] + duration = video_info["duration"] + + is_passed = False + is_finished = False + playing_time = 0 + + logger.info(f"开始任务: {job['name']}, 总时长: {duration}秒") + + # 循环提交播放进度 + while not is_finished: + if is_finished: + playing_time = duration + + is_passed = self.video_progress_log( + session, course, job, job_info, dtoken, + duration, playing_time, type_ + ) + + if not is_passed or (is_passed and is_passed["isPassed"]): break - _wait_time = get_random_seconds() - if _playingTime + _wait_time >= int(_duration): - _wait_time = int(_duration) - _playingTime - _isFinished = True - # 播放进度条 - show_progress(_job['name'], _playingTime, _wait_time, _duration, _speed) - _playingTime += _wait_time + + # 计算等待时间 + wait_time = get_random_seconds() + if playing_time + wait_time >= int(duration): + wait_time = int(duration) - playing_time + is_finished = True + + # 显示进度 + show_progress(job['name'], playing_time, wait_time, duration, speed) + playing_time += wait_time + print("\r", end="", flush=True) - logger.info(f"任务完成: {_job['name']}") - - def study_document(self, _course, _job): - _session = init_session() - _url = f"https://mooc1.chaoxing.com/ananas/job/document?jobid={_job['jobid']}&knowledgeid={re.findall(r'nodeId_(.*?)-', _job['otherinfo'])[0]}&courseid={_course['courseId']}&clazzid={_course['clazzId']}&jtoken={_job['jtoken']}&_dc={get_timestamp()}" - _resp = _session.get(_url) - - def study_work(self, _course, _job,_job_info) -> None: + logger.info(f"任务完成: {job['name']}") + + def study_document(self, course: Dict, job: Dict) -> None: + """ + 学习文档任务 + + Args: + course: 课程信息 + job: 任务信息 + """ + session = init_session() + url = (f"https://mooc1.chaoxing.com/ananas/job/document?" + f"jobid={job['jobid']}&" + f"knowledgeid={re.findall(r'nodeId_(.*?)-', job['otherinfo'])[0]}&" + f"courseid={course['courseId']}&" + f"clazzid={course['clazzId']}&" + f"jtoken={job['jtoken']}&" + f"_dc={get_timestamp()}") + session.get(url) + + def study_work(self, course: Dict, job: Dict, job_info: Dict) -> None: + """ + 完成作业任务 + + Args: + course: 课程信息 + job: 任务信息 + job_info: 任务详细信息 + """ + if not self.tiku: + logger.info("未配置题库,跳过答题") + return None + if self.tiku.DISABLE or not self.tiku: return None + def random_answer(options:str) -> str: answer = '' if not options: @@ -306,20 +452,20 @@ def multi_cut(answer:str) -> list[str]: verify=False, params = { "api": "1", - "workId": _job['jobid'].replace("work-",""), - "jobid": _job['jobid'], - "originJobId": _job['jobid'], + "workId": job['jobid'].replace("work-",""), + "jobid": job['jobid'], + "originJobId": job['jobid'], "needRedirect": "true", "skipHeader": "true", - "knowledgeid": str(_job_info['knowledgeid']), - 'ktoken': _job_info['ktoken'], - "cpi": _job_info['cpi'], + "knowledgeid": str(job_info['knowledgeid']), + 'ktoken': job_info['ktoken'], + "cpi": job_info['cpi'], "ut": "s", - "clazzId": _course['clazzId'], + "clazzId": course['clazzId'], "type": "", - "enc": _job['enc'], + "enc": job['enc'], "mooc2": "1", - "courseid": _course['courseId'] + "courseid": course['courseId'] } ) questions = decode_questions_info(_resp.text) # 加载题目信息 @@ -397,25 +543,29 @@ def multi_cut(answer:str) -> list[str]: else: logger.error(f"提交答题失败 -> {res.text}") - def strdy_read(self, _course, _job,_job_info) -> None: + def strdy_read(self, course: Dict, job: Dict, job_info: Dict) -> None: """ - 阅读任务学习,仅完成任务点,并不增长时长 + 完成阅读任务(仅完成任务点,不增加时长) + + Args: + course: 课程信息 + job: 任务信息 + job_info: 任务详细信息 """ - _session = init_session() - _resp = _session.get( + session = init_session() + resp = session.get( url="https://mooc1.chaoxing.com/ananas/job/readv2", params={ - 'jobid': _job['jobid'], - 'knowledgeid':_job_info['knowledgeid'], - 'jtoken': _job['jtoken'], - 'courseid': _course['courseId'], - 'clazzid': _course['clazzId'] + 'jobid': job['jobid'], + 'knowledgeid': job_info['knowledgeid'], + 'jtoken': job['jtoken'], + 'courseid': course['courseId'], + 'clazzid': course['clazzId'] } ) - if _resp.status_code != 200: - logger.error(f"阅读任务学习失败 -> [{_resp.status_code}]{_resp.text}") + + if resp.status_code != 200: + logger.error(f"阅读任务学习失败 -> [{resp.status_code}]{resp.text}") else: - _resp_json = _resp.json() - logger.info(f"阅读任务学习 -> {_resp_json['msg']}") - - + resp_json = resp.json() + logger.info(f"阅读任务学习 -> {resp_json['msg']}") diff --git a/api/cipher.py b/api/cipher.py index e06e7cdd..e3b2502c 100644 --- a/api/cipher.py +++ b/api/cipher.py @@ -1,54 +1,86 @@ # -*- coding:utf-8 -*- -# -*- coding: utf-8 -*- +""" +AES加密模块 +提供PKCS7填充和AES-CBC模式加密功能 +""" + import base64 +from typing import List, Union import pyaes from api.config import GlobalConst as gc -def pkcs7_unpadding(string): - return string[0:-ord(string[-1])] +def pkcs7_unpadding(data: bytes) -> bytes: + """ + 移除PKCS7填充 + + Args: + data: 需要移除填充的字节串 + + Returns: + 移除填充后的字节串 + """ + padding_length = data[-1] + return data[:-padding_length] -def pkcs7_padding(s, block_size=16): - bs = block_size - return s + (bs - len(s) % bs) * chr(bs - len(s) % bs).encode() +def pkcs7_padding(data: bytes, block_size: int = 16) -> bytes: + """ + 添加PKCS7填充 + + Args: + data: 需要填充的字节串 + block_size: 块大小,默认16字节 + + Returns: + 填充后的字节串 + """ + padding_length = block_size - len(data) % block_size + padding = bytes([padding_length] * padding_length) + return data + padding -def split_to_data_blocks(byte_str, block_size=16): - length = len(byte_str) - j, y = divmod(length, block_size) - blocks = [] - shenyu = j * block_size - for i in range(j): - start = i * block_size - end = (i + 1) * block_size - blocks.append(byte_str[start:end]) - stext = byte_str[shenyu:] - if stext: - blocks.append(stext) - return blocks +def split_to_blocks(data: bytes, block_size: int = 16) -> List[bytes]: + """ + 将字节串分割为固定大小的块 + + Args: + data: 需要分割的字节串 + block_size: 块大小,默认16字节 + + Returns: + 分割后的块列表 + """ + return [data[i:i + block_size] for i in range(0, len(data), block_size)] -class AESCipher(): +class AESCipher: + """AES加密类,使用CBC模式""" + def __init__(self): - self.key = str(gc.AESKey).encode("utf8") - self.iv = str(gc.AESKey).encode("utf8") - - def encrypt(self, plaintext: str): - ciphertext = b'' + """初始化密钥和IV向量""" + self.key = str(gc.AESKey).encode("utf-8") + self.iv = str(gc.AESKey).encode("utf-8") + + def encrypt(self, plaintext: str) -> str: + """ + 加密明文 + + Args: + plaintext: 待加密的字符串 + + Returns: + Base64编码的密文 + """ + # 初始化CBC模式 cbc = pyaes.AESModeOfOperationCBC(self.key, self.iv) - plaintext = plaintext.encode('utf-8') - blocks = split_to_data_blocks(pkcs7_padding(plaintext)) - for b in blocks: - ciphertext = ciphertext + cbc.encrypt(b) - base64_text = base64.b64encode(ciphertext).decode("utf8") - return base64_text - - # def decrypt(self, ciphertext: str): - # cbc = pyaes.AESModeOfOperationCBC(self.key, self.iv) - # ciphertext.encode('utf8') - # ciphertext = base64.b64decode(ciphertext) - # ptext = b"" - # for b in split_to_data_blocks(ciphertext): - # ptext = ptext + cbc.decrypt(b) - # return pkcs7_unpadding(ptext.decode()) \ No newline at end of file + + # 对明文进行填充并分块 + padded_data = pkcs7_padding(plaintext.encode('utf-8')) + blocks = split_to_blocks(padded_data) + + # 加密所有数据块 + ciphertext = b''.join(cbc.encrypt(block) for block in blocks) + + # Base64编码并返回 + return base64.b64encode(ciphertext).decode("utf-8") \ No newline at end of file diff --git a/api/decode.py b/api/decode.py index 7fe74639..a88d9506 100644 --- a/api/decode.py +++ b/api/decode.py @@ -1,231 +1,306 @@ # -*- coding: utf-8 -*- +""" +学习通解码模块 +用于解析和处理学习通页面中的数据 +包含课程列表、文件夹、章节点、任务卡片、题目等内容的解析 +""" + import re import json from bs4 import BeautifulSoup from api.logger import logger from api.font_decoder import FontDecoder -def decode_course_list(_text): + +def decode_course_list(html_text: str) -> list: + """解析课程列表页面,提取课程信息 + + Args: + html_text: 课程列表页面的HTML文本 + + Returns: + list: 包含所有课程信息的列表,每个课程为一个字典 + """ logger.trace("开始解码课程列表...") - _soup = BeautifulSoup(_text, "lxml") - _raw_courses = _soup.select("div.course") - _course_list = list() - for course in _raw_courses: - if not course.select_one("a.not-open-tip") and not course.select_one("div.not-open-tip"): - _course_detail = {} - _course_detail["id"] = course.attrs["id"] - _course_detail["info"] = course.attrs["info"] - _course_detail["roleid"] = course.attrs["roleid"] - - _course_detail["clazzId"] = course.select_one("input.clazzId").attrs["value"] - _course_detail["courseId"] = course.select_one("input.courseId").attrs["value"] - _course_detail["cpi"] = re.findall(r"cpi=(.*?)&", course.select_one("a").attrs["href"])[0] - _course_detail["title"] = course.select_one("span.course-name").attrs["title"] - if course.select_one("p.margint10") is None: - _course_detail["desc"] = '' - else: - _course_detail["desc"] = course.select_one("p.margint10").attrs["title"] - _course_detail["teacher"] = course.select_one("p.color3").attrs["title"] - _course_list.append(_course_detail) - return _course_list - -def decode_course_folder(_text): + soup = BeautifulSoup(html_text, "lxml") + courses = [] + + for course in soup.select("div.course"): + # 跳过未开放的课程 + if course.select_one("a.not-open-tip") or course.select_one("div.not-open-tip"): + continue + + course_info = { + "id": course.attrs["id"], + "info": course.attrs["info"], + "roleid": course.attrs["roleid"], + "clazzId": course.select_one("input.clazzId").attrs["value"], + "courseId": course.select_one("input.courseId").attrs["value"], + "cpi": re.findall(r"cpi=(.*?)&", course.select_one("a").attrs["href"])[0], + "title": course.select_one("span.course-name").attrs["title"], + "desc": course.select_one("p.margint10").attrs["title"] if course.select_one("p.margint10") else '', + "teacher": course.select_one("p.color3").attrs["title"] + } + courses.append(course_info) + + return courses + + +def decode_course_folder(html_text: str) -> list: + """解析二级课程文件夹列表 + + Args: + html_text: 文件夹页面的HTML文本 + + Returns: + list: 包含所有文件夹信息的列表 + """ logger.trace("开始解码二级课程列表...") - _soup = BeautifulSoup(_text, "lxml") - _raw_courses = _soup.select("ul.file-list>li") - _course_folder_list = list() - for course in _raw_courses: + soup = BeautifulSoup(html_text, "lxml") + folders = [] + + for course in soup.select("ul.file-list>li"): if course.attrs["fileid"]: - _course_folder_detail = {} - _course_folder_detail["id"] = course.attrs["fileid"] - _course_folder_detail["rename"] = course.select_one("input.rename-input").attrs["value"] - _course_folder_list.append(_course_folder_detail) - return _course_folder_list + folder = { + "id": course.attrs["fileid"], + "rename": course.select_one("input.rename-input").attrs["value"] + } + folders.append(folder) + + return folders + -def decode_course_point(_text): +def decode_course_point(html_text: str) -> dict: + """解析课程章节列表 + + Args: + html_text: 章节列表页面的HTML文本 + + Returns: + dict: 包含章节点信息的字典,格式为: + { + "hasLocked": bool, # 是否有需要解锁的章节 + "points": list # 章节点列表 + } + """ logger.trace("开始解码章节列表...") - _soup = BeautifulSoup(_text, "lxml") - _course_point = { - "hasLocked": False, # 用于判断该课程任务是否是需要解锁 + soup = BeautifulSoup(html_text, "lxml") + course_point = { + "hasLocked": False, "points": [] } - - for _chapter_unit in _soup.find_all("div",class_="chapter_unit") : - _point_list = [] - _raw_points = _chapter_unit.find_all("li") - for _point in _raw_points: - _point = _point.div - if (not "id" in _point.attrs): + for chapter in soup.find_all("div", class_="chapter_unit"): + points = [] + for point_li in chapter.find_all("li"): + point_div = point_li.div + if not "id" in point_div.attrs: continue - _point_detail = {} - _point_detail["id"] = re.findall(r"^cur(\d{1,20})$", _point.attrs["id"])[0] - _point_detail["title"] = _point.select_one("a.clicktitle").text.replace("\n",'').strip(' ') - _point_detail["jobCount"] = 1 # 默认为1 - if _point.select_one("input.knowledgeJobCount"): - _point_detail["jobCount"] = _point.select_one("input.knowledgeJobCount").attrs["value"] - else: - # 判断是不是因为需要解锁 - if '解锁' in _point.select_one("span.bntHoverTips").text: - _course_point["hasLocked"] = True + + point = { + "id": re.findall(r"^cur(\d{1,20})$", point_div.attrs["id"])[0], + "title": point_div.select_one("a.clicktitle").text.replace("\n",'').strip(), + "jobCount": 1 # 默认任务点数为1 + } + + # 获取实际任务点数或检查是否需要解锁 + job_count_input = point_div.select_one("input.knowledgeJobCount") + if job_count_input: + point["jobCount"] = job_count_input.attrs["value"] + elif '解锁' in point_div.select_one("span.bntHoverTips").text: + course_point["hasLocked"] = True + + points.append(point) - _point_list.append(_point_detail) - _course_point["points"]+=_point_list - return _course_point + course_point["points"].extend(points) + + return course_point -def decode_course_card(_text: str): +def decode_course_card(html_text: str) -> tuple: + """解析课程任务卡片列表 + + Args: + html_text: 任务卡片页面的HTML文本 + + Returns: + tuple: (任务列表, 任务信息) + 任务列表包含所有未完成的任务 + 任务信息包含ktoken等必要参数 + """ logger.trace("开始解码任务点列表...") - _job_info = {} - _job_list = [] - # 对于未开放章节检测 - if '章节未开放' in _text: - _job_info['notOpen'] = True - return [],_job_info - _temp = re.findall(r"mArg=\{(.*?)\};", _text.replace(" ", "")) - if _temp: - _temp = _temp[0] - else: - return [],{} - _cards = json.loads("{" + _temp + "}") - - if _cards: - _job_info = {} - _job_info["ktoken"] = _cards["defaults"]["ktoken"] - _job_info["mtEnc"] = _cards["defaults"]["mtEnc"] - _job_info["reportTimeInterval"] = _cards["defaults"]["reportTimeInterval"] # 60 - _job_info["defenc"] = _cards["defaults"]["defenc"] - _job_info["cardid"] = _cards["defaults"]["cardid"] - _job_info["cpi"] = _cards["defaults"]["cpi"] - _job_info["qnenc"] = _cards["defaults"]["qnenc"] - _job_info['knowledgeid'] = _cards["defaults"]["knowledgeid"] - _cards = _cards["attachments"] - _job_list = [] - for _card in _cards: - # 已经通过的任务 - if "isPassed" in _card and _card["isPassed"] is True: - continue - # 不属于任务点的任务 - if "job" not in _card or _card["job"] is False: - if _card.get('type') and _card['type'] == "read": - # 发现有在视频任务下掺杂阅读任务,不完成可能会导致无法开启下一章节 - if _card['property'].get('read',False): - # 已阅读,跳过 - continue - _job = {} - _job['title'] = _card['property']['title'] - _job["type"] = "read" - _job['id'] = _card['property']['id'] - _job["jobid"] = _card["jobid"] - _job["jtoken"] = _card["jtoken"] - _job['mid'] = _card['mid'] - _job['otherinfo'] = _card["otherInfo"] - _job['enc'] = _card["enc"] - _job['aid'] = _card["aid"] - _job_list.append(_job) - continue - # 视频任务 - if _card["type"] == "video": - _job = {} - _job["type"] = "video" - _job["jobid"] = _card["jobid"] - _job["name"] = _card["property"]["name"] - _job["otherinfo"] = _card["otherInfo"] - try: - _job["mid"] = _card["mid"] - except KeyError: - logger.warning("出现转码失败视频,已跳过...") - continue - _job["objectid"] = _card["objectId"] - _job["aid"] = _card["aid"] - # _job["doublespeed"] = _card["property"]["doublespeed"] - _job_list.append(_job) - continue - if _card["type"] == "document": - _job = {} - _job["type"] = "document" - _job["jobid"] = _card["jobid"] - _job["otherinfo"] = _card["otherInfo"] - _job["jtoken"] = _card["jtoken"] - _job["mid"] = _card["mid"] - _job["enc"] = _card["enc"] - _job["aid"] = _card["aid"] - _job["objectid"] = _card["property"]["objectid"] - _job_list.append(_job) - continue - if _card["type"] == "workid": - # 章节检测 - _job = {} - _job["type"] = "workid" - _job["jobid"] = _card["jobid"] - _job["otherinfo"] = _card["otherInfo"] - _job["mid"] = _card["mid"] - _job["enc"] = _card["enc"] - _job["aid"] = _card["aid"] - _job_list.append(_job) - continue - - if _card["type"] == "vote": - # 调查问卷 同上 - continue - return _job_list, _job_info + # 检查章节是否开放 + if '章节未开放' in html_text: + return [], {'notOpen': True} + + # 提取任务卡片数据 + card_data = re.findall(r"mArg=\{(.*?)\};", html_text.replace(" ", "")) + if not card_data: + return [], {} + + cards = json.loads("{" + card_data[0] + "}") + if not cards: + return [], {} + + # 提取任务信息 + job_info = { + key: cards["defaults"][key] + for key in ["ktoken", "mtEnc", "reportTimeInterval", "defenc", + "cardid", "cpi", "qnenc", "knowledgeid"] + } + # 解析任务列表 + job_list = [] + for card in cards["attachments"]: + # 跳过已完成任务 + if card.get("isPassed"): + continue + + # 处理非任务点内容 + if not card.get("job"): + if _should_process_read_task(card): + job_list.append(_parse_read_task(card)) + continue + + # 根据任务类型解析 + job_parser = { + "video": _parse_video_task, + "document": _parse_document_task, + "workid": _parse_workid_task + }.get(card["type"]) + + if job_parser: + job = job_parser(card) + if job: + job_list.append(job) + + return job_list, job_info + -def decode_questions_info(html_content) -> dict: - def replace_rtn(text): +def decode_questions_info(html_content: str) -> dict: + """解析试题信息 + + Args: + html_content: 试题页面的HTML内容 + + Returns: + dict: 包含试题信息的字典 + """ + def replace_rtn(text: str) -> str: + """清理文本中的特殊字符""" return text.replace('\r', '').replace('\t', '').replace('\n', '') soup = BeautifulSoup(html_content, "lxml") - form_data = {} form_tag = soup.find("form") - - fd = FontDecoder(html_content) # 加载字体 + fd = FontDecoder(html_content) # 加载字体解码器 - # 抽取表单信息 - for input_tag in form_tag.find_all("input"): - if 'name' not in input_tag.attrs or 'answer' in input_tag.attrs["name"]: - continue - form_data.update({ - input_tag.attrs["name"]: input_tag.attrs.get("value",'') - }) - + # 提取表单基本信息 + form_data = { + input_tag.attrs["name"]: input_tag.attrs.get("value",'') + for input_tag in form_tag.find_all("input") + if 'name' in input_tag.attrs and 'answer' not in input_tag.attrs["name"] + } + + # 题型映射表 + QUESTION_TYPES = { + '0': 'single', # 单选题 + '1': 'multiple', # 多选题 + '2': 'completion', # 填空题 + '3': 'judgement' # 判断题 + } + + # 解析试题 form_data['questions'] = [] - for div_tag in form_tag.find_all("div",class_="singleQuesId"): # 目前来说无论是单选还是多选的题class都是这个 - q_title = replace_rtn(fd.decode(div_tag.find("div", class_="Zy_TItle").text)) - q_options = '' - for li_tag in div_tag.find("ul").find_all("li"): - q_options += replace_rtn(fd.decode(li_tag.text))+'\n' - q_options=q_options[:-1] # 去除尾部'\n' - - # 尝试使用 data 属性来判断题型 - q_type_code = div_tag.find('div',class_='TiMu').attrs['data'] - q_type = '' - # 此处可能需要完善更多题型的判断 - if q_type_code == '0': - q_type = 'single' - elif q_type_code == '1': - q_type = 'multiple' - elif q_type_code == '2': - q_type = 'completion' - elif q_type_code == '3': - q_type = 'judgement' - else: - logger.info("未知题型代码 -> "+q_type_code) - q_type = 'unknown' # 避免出现未定义取值错误 - - form_data["questions"].append({ + for div_tag in form_tag.find_all("div", class_="singleQuesId"): + # 提取题目和选项 + title = replace_rtn(fd.decode(div_tag.find("div", class_="Zy_TItle").text)) + options = '\n'.join( + replace_rtn(fd.decode(li.text)) + for li in div_tag.find("ul").find_all("li") + ) + + # 获取题型 + type_code = div_tag.find('div', class_='TiMu').attrs['data'] + q_type = QUESTION_TYPES.get(type_code, 'unknown') + if q_type == 'unknown': + logger.info(f"未知题型代码 -> {type_code}") + + # 构建题目数据 + question = { 'id': div_tag.attrs["data"], - 'title':q_title, # 题目 - 'options':q_options, # 选项 可提供给题库作为辅助 - 'type': q_type, # 题型 可提供给题库作为辅助 - 'answerField':{ - 'answer'+div_tag.attrs["data"]:'', # 答案填入处 - 'answertype'+div_tag.attrs["data"]:q_type_code + 'title': title, + 'options': options, + 'type': q_type, + 'answerField': { + f'answer{div_tag.attrs["data"]}': '', + f'answertype{div_tag.attrs["data"]}': type_code } - }) - # 处理答题信息 - form_data['answerwqbid'] = ",".join([q['id'] for q in form_data['questions']])+"," + } + form_data['questions'].append(question) + + # 添加答题ID列表 + form_data['answerwqbid'] = ','.join(q['id'] for q in form_data['questions']) + ',' + return form_data +# 辅助函数 +def _should_process_read_task(card: dict) -> bool: + """判断是否需要处理阅读任务""" + return (card.get('type') == "read" and + not card.get('property', {}).get('read', False)) + +def _parse_read_task(card: dict) -> dict: + """解析阅读任务""" + return { + 'title': card['property']['title'], + 'type': 'read', + 'id': card['property']['id'], + 'jobid': card['jobid'], + 'jtoken': card['jtoken'], + 'mid': card['mid'], + 'otherinfo': card['otherInfo'], + 'enc': card['enc'], + 'aid': card['aid'] + } + +def _parse_video_task(card: dict) -> dict: + """解析视频任务""" + try: + return { + 'type': 'video', + 'jobid': card['jobid'], + 'name': card['property']['name'], + 'otherinfo': card['otherInfo'], + 'mid': card['mid'], + 'objectid': card['objectId'], + 'aid': card['aid'] + } + except KeyError: + logger.warning("出现转码失败视频,已跳过...") + return None + +def _parse_document_task(card: dict) -> dict: + """解析文档任务""" + return { + 'type': 'document', + 'jobid': card['jobid'], + 'otherinfo': card['otherInfo'], + 'jtoken': card['jtoken'], + 'mid': card['mid'], + 'enc': card['enc'], + 'aid': card['aid'], + 'objectid': card['property']['objectid'] + } + +def _parse_workid_task(card: dict) -> dict: + """解析章节测验任务""" + return { + 'type': 'workid', + 'jobid': card['jobid'], + 'otherinfo': card['otherInfo'], + 'mid': card['mid'], + 'enc': card['enc'], + 'aid': card['aid'] + } diff --git a/main.py b/main.py index 3bc8800a..2630ddf9 100644 --- a/main.py +++ b/main.py @@ -1,194 +1,386 @@ +#!/usr/bin/env python3 # -*- coding: utf-8 -*- + +""" +超星学习通自动化学习脚本 +功能: +- 自动完成视频/音频任务 +- 自动完成文档任务 +- 自动完成测验任务(需配置题库) +- 自动完成阅读任务 +""" + import argparse import configparser +import os +import traceback +from dataclasses import dataclass +from typing import Optional, Tuple, List, Dict, Any, Union + +from urllib3 import disable_warnings, exceptions + from api.logger import logger from api.base import Chaoxing, Account -from api.exceptions import LoginError, FormatError, JSONDecodeError,MaxRollBackError from api.answer import Tiku -from urllib3 import disable_warnings,exceptions -import os +from api.exceptions import LoginError, FormatError, JSONDecodeError, MaxRollBackError -# 定义全局变量,用于存储配置文件路径 -textPath = './resource/BookID.txt' +# 关闭 SSL 警告 +disable_warnings(exceptions.InsecureRequestWarning) -# 获取文本 -> 用于查看学习过的课程ID -def getText(): - try: - if not os.path.exists(textPath): - with open(textPath, 'x') as file: pass - return [] - with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',') - content = {int(item.strip()) for item in content if item.strip()} - return list(content) - except Exception as e: logger.error(f"获取文本失败: {e}"); return [] +@dataclass +class Config: + """配置数据类""" + username: str + password: str + course_list: Optional[List[str]] + speed: float + tiku_config: Optional[Dict[str, Any]] -# 追加文本 -> 用于记录学习过的课程ID -def appendText(text): - if not os.path.exists(textPath): return - with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') +class CourseManager: + """课程管理类""" + def __init__(self, chaoxing: Chaoxing): + self.chaoxing = chaoxing + self.all_courses: List[Dict] = [] + self.selected_courses: List[Dict] = [] -# 关闭警告 -disable_warnings(exceptions.InsecureRequestWarning) + def load_courses(self) -> None: + """加载所有课程""" + self.all_courses = self.chaoxing.get_course_list() + + def select_courses(self, course_list: Optional[List[str]] = None) -> None: + """ + 选择要学习的课程 + + Args: + course_list: 课程ID列表,为None时通过交互方式选择 + """ + if not course_list: + self._interactive_select() + else: + self._filter_courses(course_list) + + if not self.selected_courses: + self.selected_courses = self.all_courses + + logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(self.selected_courses)}") -def init_config(): - parser = argparse.ArgumentParser(description='Samueli924/chaoxing') # 命令行传参 - parser.add_argument("-c", "--config", type=str, default=None, help="使用配置文件运行程序") - parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") - parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") - parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表") - parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") - args = parser.parse_args() - if args.config: - config = configparser.ConfigParser() - config.read(args.config, encoding="utf8") - return (config.get("common", "username"), - config.get("common", "password"), - str(config.get("common", "course_list")).split(",") if config.get("common", "course_list") else None, - int(config.get("common", "speed")), - config['tiku'] - ) - else: - return (args.username, args.password, args.list.split(",") if args.list else None, int(args.speed) if args.speed else 1,None) + def _interactive_select(self) -> None: + """交互式选择课程""" + print("*" * 10 + "课程列表" + "*" * 10) + for course in self.all_courses: + print(f"ID: {course['courseId']} 课程名: {course['title']}") + print("*" * 28) + + try: + course_input = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").strip() + if course_input: + course_list = [c.strip() for c in course_input.split(",")] + self._filter_courses(course_list) + except Exception as e: + raise FormatError("输入格式错误") from e + + def _filter_courses(self, course_list: List[str]) -> None: + """根据课程ID列表筛选课程""" + self.selected_courses = [ + course for course in self.all_courses + if course["courseId"] in course_list + ] class RollBackManager: + """任务回滚管理类""" + + MAX_ROLLBACK = 3 + def __init__(self) -> None: - self.rollback_times = 0 - self.rollback_id = "" - - def add_times(self,id:str) -> None: - if id == self.rollback_id and self.rollback_times == 3: - raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") - elif id != self.rollback_id: - # 新job + self.rollback_times: int = 0 + self.rollback_id: str = "" + + def add_times(self, id: str) -> None: + """ + 增加回滚次数 + + Args: + id: 任务点ID + + Raises: + MaxRollBackError: 超过最大回滚次数 + """ + if id == self.rollback_id: + if self.rollback_times >= self.MAX_ROLLBACK: + raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") + self.rollback_times += 1 + else: self.rollback_id = id self.rollback_times = 1 - else: - self.rollback_times += 1 +def init_config() -> Config: + """ + 初始化配置 + + Returns: + Config: 配置对象 + """ + parser = argparse.ArgumentParser(description='超星学习通自动化学习工具') + parser.add_argument("-c", "--config", type=str, help="使用配置文件运行程序") + parser.add_argument("-u", "--username", type=str, help="手机号账号") + parser.add_argument("-p", "--password", type=str, help="登录密码") + parser.add_argument("-l", "--list", type=str, help="要学习的课程ID列表") + parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") + + args = parser.parse_args() + + if args.config: + return _load_config_file(args.config) + + return Config( + username=args.username, + password=args.password, + course_list=args.list.split(",") if args.list else None, + speed=min(2.0, max(1.0, args.speed)), + tiku_config=None + ) -if __name__ == '__main__': +def _load_config_file(config_path: str) -> Config: + """ + 从配置文件加载配置 + + Args: + config_path: 配置文件路径 + + Returns: + Config: 配置对象 + """ + config = configparser.ConfigParser() + config.read(config_path, encoding="utf8") + + return Config( + username=config.get("common", "username"), + password=config.get("common", "password"), + course_list=config.get("common", "course_list", fallback="").split(",") if config.get("common", "course_list") else None, + speed=float(config.get("common", "speed")), + tiku_config=dict(config["tiku"]) if "tiku" in config and config.get("tiku", "tokens", fallback="").strip() else None + ) + +def process_course_tasks(chaoxing: Chaoxing, course: Dict, speed: float, tiku: Tiku) -> None: + """ + 处理单个课程的所有任务 + + Args: + chaoxing: Chaoxing实例 + course: 课程信息 + speed: 视频播放速度 + tiku: 题库实例 + """ + logger.info(f"开始学习课程: {course['title']}") + + # 获取课程章节 + point_list = chaoxing.get_course_point( + course["courseId"], + course["clazzId"], + course["cpi"] + ) + + rb_manager = RollBackManager() + point_index = 0 + + # 遍历章节 + while point_index < len(point_list["points"]): + point = point_list["points"][point_index] + logger.info(f'当前章节: {point["title"]}') + + try: + process_chapter(chaoxing, course, point, point_index, speed, tiku, rb_manager) + point_index += 1 + except MaxRollBackError as e: + logger.error(str(e)) + break + +def process_chapter( + chaoxing: Chaoxing, + course: Dict, + point: Dict, + point_index: int, + speed: float, + tiku: Tiku, + rb_manager: RollBackManager +) -> None: + """ + 处理单个章节的任务 + + Args: + chaoxing: Chaoxing实例 + course: 课程信息 + point: 章节信息 + point_index: 章节索引 + speed: 视频播放速度 + tiku: 题库实例 + rb_manager: 回滚管理器 + """ + jobs, job_info = chaoxing.get_job_list( + course["clazzId"], + course["courseId"], + course["cpi"], + point["id"] + ) + + # 处理未开放章节 + if job_info.get('notOpen', False): + handle_locked_chapter(point, tiku, rb_manager) + return + + # 处理章节任务 + if jobs: + for job in jobs: + process_job(chaoxing, course, point, job, job_info, speed) + +def handle_locked_chapter(point: Dict, tiku: Optional[Tiku], rb_manager: RollBackManager) -> None: + """ + 处理未开放的章节 + + Args: + point: 章节信息 + tiku: 题库实例 + rb_manager: 回滚管理器 + + Raises: + MaxRollBackError: 章节未开放且无法处理 + """ + if not tiku or tiku.DISABLE or not tiku.SUBMIT: + logger.error("章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交") + raise MaxRollBackError("章节未开放") + rb_manager.add_times(point["id"]) + +def process_job( + chaoxing: Chaoxing, + course: Dict, + point: Dict, + job: Dict, + job_info: Dict, + speed: float +) -> None: + """ + 处理单个任务点 + + Args: + chaoxing: Chaoxing实例 + course: 课程信息 + point: 章节信息 + job: 任务信息 + job_info: 任务详细信息 + speed: 视频播放速度 + """ + job_type_handlers = { + "video": handle_video_job, + "document": handle_document_job, + "workid": handle_work_job, + "read": handle_read_job + } + + handler = job_type_handlers.get(job["type"]) + if handler: + handler(chaoxing, course, point, job, job_info, speed) + +def handle_video_job( + chaoxing: Chaoxing, + course: Dict, + point: Dict, + job: Dict, + job_info: Dict, + speed: float +) -> None: + """处理视频任务""" + logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") try: - # 避免异常的无限回滚 - RB = RollBackManager() - # 初始化登录信息 - username, password, course_list, speed,tiku_config= init_config() - # 规范化播放速度的输入值 - speed = min(2.0, max(1.0, speed)) - if (not username) or (not password): - username = input("请输入你的手机号,按回车确认\n手机号:") - password = input("请输入你的密码,按回车确认\n密码:") - account = Account(username, password) - # 设置题库 - tiku = Tiku() - tiku.config_set(tiku_config) # 载入配置 - tiku = tiku.get_tiku_from_config() # 载入题库 - tiku.init_tiku() # 初始化题库 - # 实例化超星API - chaoxing = Chaoxing(account=account,tiku=tiku) - # 检查当前登录状态,并检查账号密码 - _login_state = chaoxing.login() - if not _login_state["status"]: - raise LoginError(_login_state["msg"]) - # 获取所有的课程列表 - all_course = chaoxing.get_course_list() - course_task = [] - # 手动输入要学习的课程ID列表 - if not course_list: - print("*" * 10 + "课程列表" + "*" * 10) - for course in all_course: - print(f"ID: {course['courseId']} 课程名: {course['title']}") - print("*" * 28) - try: - course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",") - except Exception as e: - raise FormatError("输入格式错误") from e - # 筛选需要学习的课程 - for course in all_course: - if course["courseId"] in course_list: - course_task.append(course) - if not course_task: - course_task = all_course - # 开始遍历要学习的课程列表 - logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}") - for course in course_task: - logger.info(f"开始学习课程: {course['title']}") - # 获取当前课程的所有章节 - point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) - - # 为了支持课程任务回滚,采用下标方式遍历任务点 - __point_index = 0 - while __point_index < len(point_list["points"]): - point = point_list["points"][__point_index] - logger.info(f'当前章节: {point["title"]}') - # 获取当前章节的所有任务点 - jobs = [] - job_info = None - jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"]) - - bookID = job_info["knowledgeid"] # 获取视频ID - - # 发现未开放章节,尝试回滚上一个任务重新完成一次 - try: - if job_info.get('notOpen',False): - __point_index -= 1 # 默认第一个任务总是开放的 - # 针对题库启用情况 - if not tiku or tiku.DISABLE or not tiku.SUBMIT: - # 未启用题库或未开启题库提交,章节检测未完成会导致无法开始下一章,直接退出 - logger.error(f"章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交") - break - RB.add_times(point["id"]) - continue - except MaxRollBackError as e: - logger.error("回滚次数已达3次,请手动检查学习通任务点完成情况") - # 跳过该课程,继续下一课程 - break - - - # 可能存在章节无任何内容的情况 - if not jobs: - __point_index += 1 - continue - # 遍历所有任务点 - for job in jobs: - # 视频任务 - if job["type"] == "video": - - TextBookID = getText() # 获取学习过的课程ID - if TextBookID.count(bookID) > 0: - logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中,跳过") # 如果已经学习过该课程,则跳过 - break # 如果已经学习过该课程,则跳过 - appendText(bookID) # 记录正在学习的课程ID - - logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") - # 超星的接口没有返回当前任务是否为Audio音频任务 - isAudio = False - try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video") - except JSONDecodeError as e: - logger.warning("当前任务非视频任务,正在尝试音频任务解码") - isAudio = True - if isAudio: - try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio") - except JSONDecodeError as e: - logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") - # 文档任务 - elif job["type"] == "document": - logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") - chaoxing.study_document(course, job) - # 测验任务 - elif job["type"] == "workid": - logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") - chaoxing.study_work(course, job,job_info) - # 阅读任务 - elif job["type"] == "read": - logger.trace(f"识别到阅读任务, 任务章节: {course['title']}") - chaoxing.strdy_read(course, job,job_info) - __point_index += 1 + chaoxing.study_video(course, job, job_info, speed=speed, type_="Video") + except JSONDecodeError: + logger.warning("当前任务非视频任务,正在尝试音频任务解码") + try: + chaoxing.study_video(course, job, job_info, speed=speed, type_="Audio") + except JSONDecodeError: + logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") + +def handle_document_job( + chaoxing: Chaoxing, + course: Dict, + point: Dict, + job: Dict, + job_info: Dict, + speed: float +) -> None: + """处理文档任务""" + logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + chaoxing.study_document(course, job) + +def handle_work_job( + chaoxing: Chaoxing, + course: Dict, + point: Dict, + job: Dict, + job_info: Dict, + speed: float +) -> None: + """处理测验任务""" + logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") + chaoxing.study_work(course, job, job_info) + +def handle_read_job( + chaoxing: Chaoxing, + course: Dict, + point: Dict, + job: Dict, + job_info: Dict, + speed: float +) -> None: + """处理阅读任务""" + logger.trace(f"识别到阅读任务, 任务章节: {course['title']}") + chaoxing.study_read(course, job, job_info) + +def main() -> None: + """主函数""" + try: + # 初始化配置 + config = init_config() + + # 获取登录信息 + if not config.username or not config.password: + config.username = input("请输入你的手机号,按回车确认\n手机号:").strip() + config.password = input("请输入你的密码,按回车确认\n密码:").strip() + + # 初始化账号 + account = Account(config.username, config.password) + + # 初始化题库 + tiku = None + if config.tiku_config: + tiku = Tiku() + tiku.config_set(config.tiku_config) + tiku = tiku.get_tiku_from_config() + tiku.init_tiku() + + # 初始化超星API + chaoxing = Chaoxing(account=account, tiku=tiku) + + # 登录检查 + login_state = chaoxing.login() + if not login_state["status"]: + raise LoginError(login_state["msg"]) + + # 课程管理 + course_manager = CourseManager(chaoxing) + course_manager.load_courses() + course_manager.select_courses(config.course_list) + + # 处理所有课程 + for course in course_manager.selected_courses: + process_course_tasks(chaoxing, course, config.speed, tiku) + logger.info("所有课程学习任务已完成") - except BaseException as e: - import traceback + + except Exception as e: logger.error(f"错误: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) - raise e \ No newline at end of file + raise + +if __name__ == '__main__': + main() \ No newline at end of file