diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md deleted file mode 100644 index 03b23f22..00000000 --- a/.github/pull_request_template.md +++ /dev/null @@ -1,20 +0,0 @@ -# 概述 - -**简要概述修改或更新的内容** - - -# 相关Issue -**与提交代码相关的Issue链接** - - -## 更新类型 - -- [x] BUG修复 (不影响项目结构和其他代码的小型BUG修复) -- [x] 新增功能 (不影响项目结构和其他代码的功能新增) -- [x] 重大修改 (可能影响项目结构或其他代码的重大变更) -- [x] 包含文档变更 - -## 其他备注 - -**其他需要注意的内容** - diff --git a/api/base.py b/api/base.py index 214d253a..7c39dfcf 100644 --- a/api/base.py +++ b/api/base.py @@ -1,150 +1,106 @@ # -*- coding: utf-8 -*- -""" -学习通API基础模块 -包含账号登录、课程获取、任务点处理等基础功能 -""" import re import time import random import requests from hashlib import md5 from requests.adapters import HTTPAdapter -from typing import Dict, List, Optional, Union, Any from api.cipher import AESCipher from api.logger import logger from api.cookies import save_cookies, use_cookies from api.process import show_progress from api.config import GlobalConst as gc -from api.decode import ( - decode_course_list, - decode_course_point, - decode_course_card, - decode_course_folder, - decode_questions_info -) +from api.decode import (decode_course_list, + decode_course_point, + decode_course_card, + decode_course_folder, + decode_questions_info + ) from api.answer import * - -def get_timestamp() -> str: - """获取毫秒级时间戳""" +def get_timestamp(): return str(int(time.time() * 1000)) -def get_random_seconds() -> int: - """获取随机等待时间(30-90秒)""" +def get_random_seconds(): return random.randint(30, 90) -def init_session(is_video: bool = False, is_audio: bool = False) -> requests.Session: - """ - 初始化requests会话 - - Args: - is_video: 是否为视频请求 - is_audio: 是否为音频请求 - - Returns: - 配置好的requests.Session对象 - """ - session = requests.session() - session.verify = False - # 配置重试机制 - adapter = HTTPAdapter(max_retries=3) - session.mount('http://', adapter) - session.mount('https://', adapter) - - # 根据请求类型设置对应headers - if is_video: - session.headers = gc.VIDEO_HEADERS - elif is_audio: - session.headers = gc.AUDIO_HEADERS +def init_session(isVideo: bool = False, isAudio: bool = False): + _session = requests.session() + _session.verify = False + _session.mount('http://', HTTPAdapter(max_retries=3)) + _session.mount('https://', HTTPAdapter(max_retries=3)) + if isVideo: + _session.headers = gc.VIDEO_HEADERS + elif isAudio: + _session.headers = gc.AUDIO_HEADERS else: - session.headers = gc.HEADERS - - session.cookies.update(use_cookies()) - return session + _session.headers = gc.HEADERS + _session.cookies.update(use_cookies()) + return _session class Account: - """账号信息类""" - def __init__(self, username: str, password: str): - self.username = username - self.password = password - self.last_login = None - self.is_success = None + username = None + password = None + last_login = None + isSuccess = None + def __init__(self, _username, _password): + self.username = _username + self.password = _password class Chaoxing: - """超星学习通API封装类""" - - def __init__(self, account: Optional[Account] = None, tiku: Optional['Tiku'] = None): + def __init__(self, account: Account = None,tiku:Tiku=None): self.account = account self.cipher = AESCipher() self.tiku = tiku - def login(self) -> Dict[str, Union[bool, str]]: - """ - 登录学习通 - - Returns: - 登录结果字典 {"status": bool, "msg": str} - """ - session = requests.session() - session.verify = False - url = "https://passport2.chaoxing.com/fanyalogin" - - data = { - "fid": "-1", - "uname": self.cipher.encrypt(self.account.username), - "password": self.cipher.encrypt(self.account.password), - "refer": "https%3A%2F%2Fi.chaoxing.com", - "t": True, - "forbidotherlogin": 0, - "validate": "", - "doubleFactorLogin": 0, - "independentId": 0 - } - + def login(self): + _session = requests.session() + _session.verify = False + _url = "https://passport2.chaoxing.com/fanyalogin" + _data = {"fid": "-1", + "uname": self.cipher.encrypt(self.account.username), + "password": self.cipher.encrypt(self.account.password), + "refer": "https%3A%2F%2Fi.chaoxing.com", + "t": True, + "forbidotherlogin": 0, + "validate": "", + "doubleFactorLogin": 0, + "independentId": 0 + } logger.trace("正在尝试登录...") - resp = session.post(url, headers=gc.HEADERS, data=data) - - if resp and resp.json()["status"]: - save_cookies(session) + resp = _session.post(_url, headers=gc.HEADERS, data=_data) + if resp and resp.json()["status"] == True: + save_cookies(_session) logger.info("登录成功...") return {"status": True, "msg": "登录成功"} - return {"status": False, "msg": str(resp.json()["msg2"])} + else: + return {"status": False, "msg": str(resp.json()["msg2"])} - def get_fid(self) -> str: - """获取fid""" - return init_session().cookies.get("fid") + def get_fid(self): + _session = init_session() + return _session.cookies.get("fid") - def get_uid(self) -> str: - """获取用户ID""" - return init_session().cookies.get("_uid") + def get_uid(self): + _session = init_session() + return _session.cookies.get("_uid") - def get_course_list(self) -> List[Dict]: - """ - 获取课程列表 - - Returns: - 课程信息列表 - """ - session = init_session() - url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" - - # 基础请求数据 - data = { + def get_course_list(self): + _session = init_session() + _url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" + _data = { "courseType": 1, "courseFolderId": 0, "query": "", "superstarClass": 0 } - logger.trace("正在读取所有的课程列表...") - - # 设置专用headers - headers = { + # 接口突然抽风,增加headers + _headers = { "Host": "mooc2-ans.chaoxing.com", "sec-ch-ua-platform": "\"Windows\"", "X-Requested-With": "XMLHttpRequest", @@ -160,240 +116,138 @@ def get_course_list(self) -> List[Dict]: "Referer": "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction?moocDomain=https://mooc1-1.chaoxing.com/mooc-ans", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" } - - # 获取主课程列表 - resp = session.post(url, headers=headers, data=data) + _resp = _session.post(_url,headers=_headers,data=_data) + # logger.trace(f"原始课程列表内容:\n{_resp.text}") logger.info("课程列表读取完毕...") - course_list = decode_course_list(resp.text) + course_list = decode_course_list(_resp.text) - # 获取文件夹中的课程 - interaction_url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction" - interaction_resp = session.get(interaction_url) - course_folder = decode_course_folder(interaction_resp.text) - - # 遍历文件夹获取课程 + _interaction_url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction" + _interaction_resp = _session.get(_interaction_url) + course_folder = decode_course_folder(_interaction_resp.text) for folder in course_folder: - folder_data = data.copy() - folder_data["courseFolderId"] = folder["id"] - resp = session.post(url, data=folder_data) - course_list.extend(decode_course_list(resp.text)) - + _data = { + "courseType": 1, + "courseFolderId": folder["id"], + "query": "", + "superstarClass": 0 + } + _resp = _session.post(_url, data=_data) + course_list += decode_course_list(_resp.text) return course_list - def get_course_point(self, courseid: str, clazzid: str, cpi: str) -> List[Dict]: - """ - 获取课程章节信息 - - Args: - courseid: 课程ID - clazzid: 班级ID - cpi: 课程参数 - - Returns: - 章节信息列表 - """ - session = init_session() - url = f"https://mooc2-ans.chaoxing.com/mooc2-ans/mycourse/studentcourse?courseid={courseid}&clazzid={clazzid}&cpi={cpi}&ut=s" - + def get_course_point(self, _courseid, _clazzid, _cpi): + _session = init_session() + _url = f"https://mooc2-ans.chaoxing.com/mooc2-ans/mycourse/studentcourse?courseid={_courseid}&clazzid={_clazzid}&cpi={_cpi}&ut=s" logger.trace("开始读取课程所有章节...") - resp = session.get(url) + _resp = _session.get(_url) + # logger.trace(f"原始章节列表内容:\n{_resp.text}") logger.info("课程章节读取成功...") - - return decode_course_point(resp.text) + return decode_course_point(_resp.text) - def get_job_list(self, clazzid: str, courseid: str, cpi: str, knowledgeid: str) -> tuple[List[Dict], Dict]: - """ - 获取章节任务点列表 - - Args: - clazzid: 班级ID - courseid: 课程ID - cpi: 课程参数 - knowledgeid: 知识点ID - - Returns: - (任务点列表, 任务点信息) - """ - session = init_session() + def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): + _session = init_session() job_list = [] job_info = {} - - # 遍历可能的任务卡片数量 - for num in ["0", "1", "2"]: - url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={clazzid}&courseid={courseid}&knowledgeid={knowledgeid}&num={num}&ut=s&cpi={cpi}&v=20160407-3&mooc2=1" - + for _possible_num in ["0", "1","2"]: # 学习界面任务卡片数,很少有3个的,但是对于章节解锁任务点少一个都不行,可以从API /mooc-ans/mycourse/studentstudyAjax获取值,或者干脆直接加,但二者都会造成额外的请求 + _url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={_clazzid}&courseid={_courseid}&knowledgeid={_knowledgeid}&num={_possible_num}&ut=s&cpi={_cpi}&v=20160407-3&mooc2=1" logger.trace("开始读取章节所有任务点...") - resp = session.get(url) - tasks, info = decode_course_card(resp.text) - - # 检查章节是否开放 - if info.get('notOpen', False): + _resp = _session.get(_url) + _job_list, _job_info = decode_course_card(_resp.text) + if _job_info.get('notOpen',False): + # 直接返回,节省一次请求 logger.info("该章节未开放") - return [], info - - job_list.extend(tasks) - job_info.update(info) - + return [], _job_info + job_list += _job_list + job_info.update(_job_info) + # if _job_list and len(_job_list) != 0: + # break + # logger.trace(f"原始任务点列表内容:\n{_resp.text}") logger.info("章节任务点读取成功...") return job_list, job_info - def get_enc(self, clazz_id: str, jobid: str, object_id: str, playing_time: int, - duration: int, userid: str) -> str: - """ - 生成视频播放加密参数 - """ - raw = f"[{clazz_id}][{userid}][{jobid}][{object_id}][{playing_time * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]" - return md5(raw.encode()).hexdigest() + def get_enc(self, clazzId, jobid, objectId, playingTime, duration, userid): + return md5( + f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]" + .encode()).hexdigest() - def video_progress_log(self, session: requests.Session, course: Dict, job: Dict, - job_info: Dict, dtoken: str, duration: int, playing_time: int, - type_: str = "Video") -> Union[Dict, bool]: - """ - 记录视频播放进度 - - Args: - session: 请求会话 - course: 课程信息 - job: 任务信息 - job_info: 任务详细信息 - dtoken: 播放token - duration: 视频总时长 - playing_time: 当前播放时间 - type_: 媒体类型(Video/Audio) - - Returns: - 响应结果或False(失败) - """ - # 构建otherInfo参数 - other_info = (f"otherInfo={job['otherinfo']}&courseId={course['courseId']}&" - if "courseId" not in job['otherinfo'] - else f"otherInfo={job['otherinfo']}&") - - success = False - # 尝试不同的rt参数 - for rt in ["0.9", "1"]: - url = ( - f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/{course['cpi']}/{dtoken}?" - f"clazzId={course['clazzId']}&" - f"playingTime={playing_time}&" - f"duration={duration}&" - f"clipTime=0_{duration}&" - f"objectId={job['objectid']}&" - f"{other_info}" - f"jobid={job['jobid']}&" - f"userid={self.get_uid()}&" - f"isdrag=3&" - f"view=pc&" - f"enc={self.get_enc(course['clazzId'], job['jobid'], job['objectid'], playing_time, duration, self.get_uid())}&" - f"rt={rt}&" - f"dtype={type_}&" - f"_t={get_timestamp()}" - ) - - resp = session.get(url) + def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type: str = "Video"): + if "courseId" in _job['otherinfo']: + _mid_text = f"otherInfo={_job['otherinfo']}&" + else: + _mid_text = f"otherInfo={_job['otherinfo']}&courseId={_course['courseId']}&" + _success = False + for _possible_rt in ["0.9", "1"]: + _url = (f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/" + f"{_course['cpi']}/" + f"{_dtoken}?" + f"clazzId={_course['clazzId']}&" + f"playingTime={_playingTime}&" + f"duration={_duration}&" + f"clipTime=0_{_duration}&" + f"objectId={_job['objectid']}&" + f"{_mid_text}" + f"jobid={_job['jobid']}&" + f"userid={self.get_uid()}&" + f"isdrag=3&" + f"view=pc&" + f"enc={self.get_enc(_course['clazzId'], _job['jobid'], _job['objectid'], _playingTime, _duration, self.get_uid())}&" + f"rt={_possible_rt}&" + f"dtype={_type}&" + f"_t={get_timestamp()}") + resp = _session.get(_url) if resp.status_code == 200: - success = True - break - - if success: + _success = True + break # 如果返回为200正常,则跳出循环 + elif resp.status_code == 403: + continue # 如果出现403无权限报错,则继续尝试不同的rt参数 + if _success: return resp.json() - - logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") - return False + else: + # 若出现两个rt参数都返回403的情况,则跳过当前任务 + logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") + return False - def study_video(self, course: Dict, job: Dict, job_info: Dict, - speed: float = 1.0, type_: str = "Video") -> None: - """ - 学习视频/音频任务 - - Args: - course: 课程信息 - job: 任务信息 - job_info: 任务详细信息 - speed: 播放速度 - type_: 媒体类型(Video/Audio) - """ - # 初始化会话 - session = init_session(is_video=(type_=="Video"), is_audio=(type_=="Audio")) - - # 获取视频信息 - info_url = f"https://mooc1.chaoxing.com/ananas/status/{job['objectid']}?k={self.get_fid()}&flag=normal" - video_info = session.get(info_url).json() - - if "status" not in video_info: - logger.info("获取视频信息失败,跳过当前任务点...") - return None - - if video_info["status"] == "success": - dtoken = video_info["dtoken"] - duration = video_info["duration"] - - is_passed = False - is_finished = False - playing_time = 0 - - logger.info(f"开始任务: {job['name']}, 总时长: {duration}秒") - - # 循环提交播放进度 - while not is_finished: - if is_finished: - playing_time = duration - - is_passed = self.video_progress_log( - session, course, job, job_info, dtoken, - duration, playing_time, type_ - ) - - if not is_passed or (is_passed and is_passed["isPassed"]): + def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video"): + if _type == "Video": + _session = init_session(isVideo=True) + else: + _session = init_session(isAudio=True) + _session.headers.update() + _info_url = f"https://mooc1.chaoxing.com/ananas/status/{_job['objectid']}?k={self.get_fid()}&flag=normal" + _video_info = _session.get(_info_url).json() + if _video_info["status"] == "success": + _dtoken = _video_info["dtoken"] + _duration = _video_info["duration"] + _crc = _video_info["crc"] + _key = _video_info["key"] + _isPassed = False + _isFinished = False + _playingTime = 0 + logger.info(f"开始任务: {_job['name']}, 总时长: {_duration}秒") + while not _isFinished: + if _isFinished: + _playingTime = _duration + _isPassed = self.video_progress_log(_session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type) + if not _isPassed or (_isPassed and _isPassed["isPassed"]): break - - # 计算等待时间 - wait_time = get_random_seconds() - if playing_time + wait_time >= int(duration): - wait_time = int(duration) - playing_time - is_finished = True - - # 显示进度 - show_progress(job['name'], playing_time, wait_time, duration, speed) - playing_time += wait_time - + _wait_time = get_random_seconds() + if _playingTime + _wait_time >= int(_duration): + _wait_time = int(_duration) - _playingTime + _isFinished = True + # 播放进度条 + show_progress(_job['name'], _playingTime, _wait_time, _duration, _speed) + _playingTime += _wait_time print("\r", end="", flush=True) - logger.info(f"任务完成: {job['name']}") - - def study_document(self, course: Dict, job: Dict) -> None: - """ - 学习文档任务 - - Args: - course: 课程信息 - job: 任务信息 - """ - session = init_session() - url = (f"https://mooc1.chaoxing.com/ananas/job/document?" - f"jobid={job['jobid']}&" - f"knowledgeid={re.findall(r'nodeId_(.*?)-', job['otherinfo'])[0]}&" - f"courseid={course['courseId']}&" - f"clazzid={course['clazzId']}&" - f"jtoken={job['jtoken']}&" - f"_dc={get_timestamp()}") - session.get(url) - - def study_work(self, course: Dict, job: Dict, job_info: Dict) -> None: - """ - 完成作业任务 - - Args: - course: 课程信息 - job: 任务信息 - job_info: 任务详细信息 - """ - if not self.tiku: - logger.info("未配置题库,跳过答题") - return None - + logger.info(f"任务完成: {_job['name']}") + + def study_document(self, _course, _job): + _session = init_session() + _url = f"https://mooc1.chaoxing.com/ananas/job/document?jobid={_job['jobid']}&knowledgeid={re.findall(r'nodeId_(.*?)-', _job['otherinfo'])[0]}&courseid={_course['courseId']}&clazzid={_course['clazzId']}&jtoken={_job['jtoken']}&_dc={get_timestamp()}" + _resp = _session.get(_url) + + def study_work(self, _course, _job,_job_info) -> None: if self.tiku.DISABLE or not self.tiku: return None - def random_answer(options:str) -> str: answer = '' if not options: @@ -452,20 +306,20 @@ def multi_cut(answer:str) -> list[str]: verify=False, params = { "api": "1", - "workId": job['jobid'].replace("work-",""), - "jobid": job['jobid'], - "originJobId": job['jobid'], + "workId": _job['jobid'].replace("work-",""), + "jobid": _job['jobid'], + "originJobId": _job['jobid'], "needRedirect": "true", "skipHeader": "true", - "knowledgeid": str(job_info['knowledgeid']), - 'ktoken': job_info['ktoken'], - "cpi": job_info['cpi'], + "knowledgeid": str(_job_info['knowledgeid']), + 'ktoken': _job_info['ktoken'], + "cpi": _job_info['cpi'], "ut": "s", - "clazzId": course['clazzId'], + "clazzId": _course['clazzId'], "type": "", - "enc": job['enc'], + "enc": _job['enc'], "mooc2": "1", - "courseid": course['courseId'] + "courseid": _course['courseId'] } ) questions = decode_questions_info(_resp.text) # 加载题目信息 @@ -543,29 +397,25 @@ def multi_cut(answer:str) -> list[str]: else: logger.error(f"提交答题失败 -> {res.text}") - def strdy_read(self, course: Dict, job: Dict, job_info: Dict) -> None: + def strdy_read(self, _course, _job,_job_info) -> None: """ - 完成阅读任务(仅完成任务点,不增加时长) - - Args: - course: 课程信息 - job: 任务信息 - job_info: 任务详细信息 + 阅读任务学习,仅完成任务点,并不增长时长 """ - session = init_session() - resp = session.get( + _session = init_session() + _resp = _session.get( url="https://mooc1.chaoxing.com/ananas/job/readv2", params={ - 'jobid': job['jobid'], - 'knowledgeid': job_info['knowledgeid'], - 'jtoken': job['jtoken'], - 'courseid': course['courseId'], - 'clazzid': course['clazzId'] + 'jobid': _job['jobid'], + 'knowledgeid':_job_info['knowledgeid'], + 'jtoken': _job['jtoken'], + 'courseid': _course['courseId'], + 'clazzid': _course['clazzId'] } ) - - if resp.status_code != 200: - logger.error(f"阅读任务学习失败 -> [{resp.status_code}]{resp.text}") + if _resp.status_code != 200: + logger.error(f"阅读任务学习失败 -> [{_resp.status_code}]{_resp.text}") else: - resp_json = resp.json() - logger.info(f"阅读任务学习 -> {resp_json['msg']}") + _resp_json = _resp.json() + logger.info(f"阅读任务学习 -> {_resp_json['msg']}") + + diff --git a/api/cipher.py b/api/cipher.py index e3b2502c..e06e7cdd 100644 --- a/api/cipher.py +++ b/api/cipher.py @@ -1,86 +1,54 @@ # -*- coding:utf-8 -*- -""" -AES加密模块 -提供PKCS7填充和AES-CBC模式加密功能 -""" - +# -*- coding: utf-8 -*- import base64 -from typing import List, Union import pyaes from api.config import GlobalConst as gc -def pkcs7_unpadding(data: bytes) -> bytes: - """ - 移除PKCS7填充 - - Args: - data: 需要移除填充的字节串 - - Returns: - 移除填充后的字节串 - """ - padding_length = data[-1] - return data[:-padding_length] +def pkcs7_unpadding(string): + return string[0:-ord(string[-1])] -def pkcs7_padding(data: bytes, block_size: int = 16) -> bytes: - """ - 添加PKCS7填充 - - Args: - data: 需要填充的字节串 - block_size: 块大小,默认16字节 - - Returns: - 填充后的字节串 - """ - padding_length = block_size - len(data) % block_size - padding = bytes([padding_length] * padding_length) - return data + padding +def pkcs7_padding(s, block_size=16): + bs = block_size + return s + (bs - len(s) % bs) * chr(bs - len(s) % bs).encode() -def split_to_blocks(data: bytes, block_size: int = 16) -> List[bytes]: - """ - 将字节串分割为固定大小的块 - - Args: - data: 需要分割的字节串 - block_size: 块大小,默认16字节 - - Returns: - 分割后的块列表 - """ - return [data[i:i + block_size] for i in range(0, len(data), block_size)] +def split_to_data_blocks(byte_str, block_size=16): + length = len(byte_str) + j, y = divmod(length, block_size) + blocks = [] + shenyu = j * block_size + for i in range(j): + start = i * block_size + end = (i + 1) * block_size + blocks.append(byte_str[start:end]) + stext = byte_str[shenyu:] + if stext: + blocks.append(stext) + return blocks -class AESCipher: - """AES加密类,使用CBC模式""" - +class AESCipher(): def __init__(self): - """初始化密钥和IV向量""" - self.key = str(gc.AESKey).encode("utf-8") - self.iv = str(gc.AESKey).encode("utf-8") - - def encrypt(self, plaintext: str) -> str: - """ - 加密明文 - - Args: - plaintext: 待加密的字符串 - - Returns: - Base64编码的密文 - """ - # 初始化CBC模式 + self.key = str(gc.AESKey).encode("utf8") + self.iv = str(gc.AESKey).encode("utf8") + + def encrypt(self, plaintext: str): + ciphertext = b'' cbc = pyaes.AESModeOfOperationCBC(self.key, self.iv) - - # 对明文进行填充并分块 - padded_data = pkcs7_padding(plaintext.encode('utf-8')) - blocks = split_to_blocks(padded_data) - - # 加密所有数据块 - ciphertext = b''.join(cbc.encrypt(block) for block in blocks) - - # Base64编码并返回 - return base64.b64encode(ciphertext).decode("utf-8") \ No newline at end of file + plaintext = plaintext.encode('utf-8') + blocks = split_to_data_blocks(pkcs7_padding(plaintext)) + for b in blocks: + ciphertext = ciphertext + cbc.encrypt(b) + base64_text = base64.b64encode(ciphertext).decode("utf8") + return base64_text + + # def decrypt(self, ciphertext: str): + # cbc = pyaes.AESModeOfOperationCBC(self.key, self.iv) + # ciphertext.encode('utf8') + # ciphertext = base64.b64decode(ciphertext) + # ptext = b"" + # for b in split_to_data_blocks(ciphertext): + # ptext = ptext + cbc.decrypt(b) + # return pkcs7_unpadding(ptext.decode()) \ No newline at end of file diff --git a/api/decode.py b/api/decode.py index a88d9506..7fe74639 100644 --- a/api/decode.py +++ b/api/decode.py @@ -1,306 +1,231 @@ # -*- coding: utf-8 -*- -""" -学习通解码模块 -用于解析和处理学习通页面中的数据 -包含课程列表、文件夹、章节点、任务卡片、题目等内容的解析 -""" - import re import json from bs4 import BeautifulSoup from api.logger import logger from api.font_decoder import FontDecoder - -def decode_course_list(html_text: str) -> list: - """解析课程列表页面,提取课程信息 - - Args: - html_text: 课程列表页面的HTML文本 - - Returns: - list: 包含所有课程信息的列表,每个课程为一个字典 - """ +def decode_course_list(_text): logger.trace("开始解码课程列表...") - soup = BeautifulSoup(html_text, "lxml") - courses = [] - - for course in soup.select("div.course"): - # 跳过未开放的课程 - if course.select_one("a.not-open-tip") or course.select_one("div.not-open-tip"): - continue - - course_info = { - "id": course.attrs["id"], - "info": course.attrs["info"], - "roleid": course.attrs["roleid"], - "clazzId": course.select_one("input.clazzId").attrs["value"], - "courseId": course.select_one("input.courseId").attrs["value"], - "cpi": re.findall(r"cpi=(.*?)&", course.select_one("a").attrs["href"])[0], - "title": course.select_one("span.course-name").attrs["title"], - "desc": course.select_one("p.margint10").attrs["title"] if course.select_one("p.margint10") else '', - "teacher": course.select_one("p.color3").attrs["title"] - } - courses.append(course_info) - - return courses - - -def decode_course_folder(html_text: str) -> list: - """解析二级课程文件夹列表 - - Args: - html_text: 文件夹页面的HTML文本 - - Returns: - list: 包含所有文件夹信息的列表 - """ + _soup = BeautifulSoup(_text, "lxml") + _raw_courses = _soup.select("div.course") + _course_list = list() + for course in _raw_courses: + if not course.select_one("a.not-open-tip") and not course.select_one("div.not-open-tip"): + _course_detail = {} + _course_detail["id"] = course.attrs["id"] + _course_detail["info"] = course.attrs["info"] + _course_detail["roleid"] = course.attrs["roleid"] + + _course_detail["clazzId"] = course.select_one("input.clazzId").attrs["value"] + _course_detail["courseId"] = course.select_one("input.courseId").attrs["value"] + _course_detail["cpi"] = re.findall(r"cpi=(.*?)&", course.select_one("a").attrs["href"])[0] + _course_detail["title"] = course.select_one("span.course-name").attrs["title"] + if course.select_one("p.margint10") is None: + _course_detail["desc"] = '' + else: + _course_detail["desc"] = course.select_one("p.margint10").attrs["title"] + _course_detail["teacher"] = course.select_one("p.color3").attrs["title"] + _course_list.append(_course_detail) + return _course_list + +def decode_course_folder(_text): logger.trace("开始解码二级课程列表...") - soup = BeautifulSoup(html_text, "lxml") - folders = [] - - for course in soup.select("ul.file-list>li"): + _soup = BeautifulSoup(_text, "lxml") + _raw_courses = _soup.select("ul.file-list>li") + _course_folder_list = list() + for course in _raw_courses: if course.attrs["fileid"]: - folder = { - "id": course.attrs["fileid"], - "rename": course.select_one("input.rename-input").attrs["value"] - } - folders.append(folder) - - return folders - + _course_folder_detail = {} + _course_folder_detail["id"] = course.attrs["fileid"] + _course_folder_detail["rename"] = course.select_one("input.rename-input").attrs["value"] + _course_folder_list.append(_course_folder_detail) + return _course_folder_list -def decode_course_point(html_text: str) -> dict: - """解析课程章节列表 - - Args: - html_text: 章节列表页面的HTML文本 - - Returns: - dict: 包含章节点信息的字典,格式为: - { - "hasLocked": bool, # 是否有需要解锁的章节 - "points": list # 章节点列表 - } - """ +def decode_course_point(_text): logger.trace("开始解码章节列表...") - soup = BeautifulSoup(html_text, "lxml") - course_point = { - "hasLocked": False, + _soup = BeautifulSoup(_text, "lxml") + _course_point = { + "hasLocked": False, # 用于判断该课程任务是否是需要解锁 "points": [] } - for chapter in soup.find_all("div", class_="chapter_unit"): - points = [] - for point_li in chapter.find_all("li"): - point_div = point_li.div - if not "id" in point_div.attrs: + + for _chapter_unit in _soup.find_all("div",class_="chapter_unit") : + _point_list = [] + _raw_points = _chapter_unit.find_all("li") + for _point in _raw_points: + _point = _point.div + if (not "id" in _point.attrs): continue - - point = { - "id": re.findall(r"^cur(\d{1,20})$", point_div.attrs["id"])[0], - "title": point_div.select_one("a.clicktitle").text.replace("\n",'').strip(), - "jobCount": 1 # 默认任务点数为1 - } - - # 获取实际任务点数或检查是否需要解锁 - job_count_input = point_div.select_one("input.knowledgeJobCount") - if job_count_input: - point["jobCount"] = job_count_input.attrs["value"] - elif '解锁' in point_div.select_one("span.bntHoverTips").text: - course_point["hasLocked"] = True - - points.append(point) + _point_detail = {} + _point_detail["id"] = re.findall(r"^cur(\d{1,20})$", _point.attrs["id"])[0] + _point_detail["title"] = _point.select_one("a.clicktitle").text.replace("\n",'').strip(' ') + _point_detail["jobCount"] = 1 # 默认为1 + if _point.select_one("input.knowledgeJobCount"): + _point_detail["jobCount"] = _point.select_one("input.knowledgeJobCount").attrs["value"] + else: + # 判断是不是因为需要解锁 + if '解锁' in _point.select_one("span.bntHoverTips").text: + _course_point["hasLocked"] = True - course_point["points"].extend(points) - - return course_point + _point_list.append(_point_detail) + _course_point["points"]+=_point_list + return _course_point -def decode_course_card(html_text: str) -> tuple: - """解析课程任务卡片列表 - - Args: - html_text: 任务卡片页面的HTML文本 - - Returns: - tuple: (任务列表, 任务信息) - 任务列表包含所有未完成的任务 - 任务信息包含ktoken等必要参数 - """ +def decode_course_card(_text: str): logger.trace("开始解码任务点列表...") + _job_info = {} + _job_list = [] + # 对于未开放章节检测 + if '章节未开放' in _text: + _job_info['notOpen'] = True + return [],_job_info - # 检查章节是否开放 - if '章节未开放' in html_text: - return [], {'notOpen': True} - - # 提取任务卡片数据 - card_data = re.findall(r"mArg=\{(.*?)\};", html_text.replace(" ", "")) - if not card_data: - return [], {} - - cards = json.loads("{" + card_data[0] + "}") - if not cards: - return [], {} - - # 提取任务信息 - job_info = { - key: cards["defaults"][key] - for key in ["ktoken", "mtEnc", "reportTimeInterval", "defenc", - "cardid", "cpi", "qnenc", "knowledgeid"] - } + _temp = re.findall(r"mArg=\{(.*?)\};", _text.replace(" ", "")) + if _temp: + _temp = _temp[0] + else: + return [],{} + _cards = json.loads("{" + _temp + "}") + + if _cards: + _job_info = {} + _job_info["ktoken"] = _cards["defaults"]["ktoken"] + _job_info["mtEnc"] = _cards["defaults"]["mtEnc"] + _job_info["reportTimeInterval"] = _cards["defaults"]["reportTimeInterval"] # 60 + _job_info["defenc"] = _cards["defaults"]["defenc"] + _job_info["cardid"] = _cards["defaults"]["cardid"] + _job_info["cpi"] = _cards["defaults"]["cpi"] + _job_info["qnenc"] = _cards["defaults"]["qnenc"] + _job_info['knowledgeid'] = _cards["defaults"]["knowledgeid"] + _cards = _cards["attachments"] + _job_list = [] + for _card in _cards: + # 已经通过的任务 + if "isPassed" in _card and _card["isPassed"] is True: + continue + # 不属于任务点的任务 + if "job" not in _card or _card["job"] is False: + if _card.get('type') and _card['type'] == "read": + # 发现有在视频任务下掺杂阅读任务,不完成可能会导致无法开启下一章节 + if _card['property'].get('read',False): + # 已阅读,跳过 + continue + _job = {} + _job['title'] = _card['property']['title'] + _job["type"] = "read" + _job['id'] = _card['property']['id'] + _job["jobid"] = _card["jobid"] + _job["jtoken"] = _card["jtoken"] + _job['mid'] = _card['mid'] + _job['otherinfo'] = _card["otherInfo"] + _job['enc'] = _card["enc"] + _job['aid'] = _card["aid"] + _job_list.append(_job) + continue + # 视频任务 + if _card["type"] == "video": + _job = {} + _job["type"] = "video" + _job["jobid"] = _card["jobid"] + _job["name"] = _card["property"]["name"] + _job["otherinfo"] = _card["otherInfo"] + try: + _job["mid"] = _card["mid"] + except KeyError: + logger.warning("出现转码失败视频,已跳过...") + continue + _job["objectid"] = _card["objectId"] + _job["aid"] = _card["aid"] + # _job["doublespeed"] = _card["property"]["doublespeed"] + _job_list.append(_job) + continue + if _card["type"] == "document": + _job = {} + _job["type"] = "document" + _job["jobid"] = _card["jobid"] + _job["otherinfo"] = _card["otherInfo"] + _job["jtoken"] = _card["jtoken"] + _job["mid"] = _card["mid"] + _job["enc"] = _card["enc"] + _job["aid"] = _card["aid"] + _job["objectid"] = _card["property"]["objectid"] + _job_list.append(_job) + continue + if _card["type"] == "workid": + # 章节检测 + _job = {} + _job["type"] = "workid" + _job["jobid"] = _card["jobid"] + _job["otherinfo"] = _card["otherInfo"] + _job["mid"] = _card["mid"] + _job["enc"] = _card["enc"] + _job["aid"] = _card["aid"] + _job_list.append(_job) + continue + + if _card["type"] == "vote": + # 调查问卷 同上 + continue + return _job_list, _job_info - # 解析任务列表 - job_list = [] - for card in cards["attachments"]: - # 跳过已完成任务 - if card.get("isPassed"): - continue - - # 处理非任务点内容 - if not card.get("job"): - if _should_process_read_task(card): - job_list.append(_parse_read_task(card)) - continue - - # 根据任务类型解析 - job_parser = { - "video": _parse_video_task, - "document": _parse_document_task, - "workid": _parse_workid_task - }.get(card["type"]) - - if job_parser: - job = job_parser(card) - if job: - job_list.append(job) - - return job_list, job_info - -def decode_questions_info(html_content: str) -> dict: - """解析试题信息 - - Args: - html_content: 试题页面的HTML内容 - - Returns: - dict: 包含试题信息的字典 - """ - def replace_rtn(text: str) -> str: - """清理文本中的特殊字符""" +def decode_questions_info(html_content) -> dict: + def replace_rtn(text): return text.replace('\r', '').replace('\t', '').replace('\n', '') soup = BeautifulSoup(html_content, "lxml") + form_data = {} form_tag = soup.find("form") - fd = FontDecoder(html_content) # 加载字体解码器 - - # 提取表单基本信息 - form_data = { - input_tag.attrs["name"]: input_tag.attrs.get("value",'') - for input_tag in form_tag.find_all("input") - if 'name' in input_tag.attrs and 'answer' not in input_tag.attrs["name"] - } - - # 题型映射表 - QUESTION_TYPES = { - '0': 'single', # 单选题 - '1': 'multiple', # 多选题 - '2': 'completion', # 填空题 - '3': 'judgement' # 判断题 - } + + fd = FontDecoder(html_content) # 加载字体 - # 解析试题 + # 抽取表单信息 + for input_tag in form_tag.find_all("input"): + if 'name' not in input_tag.attrs or 'answer' in input_tag.attrs["name"]: + continue + form_data.update({ + input_tag.attrs["name"]: input_tag.attrs.get("value",'') + }) + form_data['questions'] = [] - for div_tag in form_tag.find_all("div", class_="singleQuesId"): - # 提取题目和选项 - title = replace_rtn(fd.decode(div_tag.find("div", class_="Zy_TItle").text)) - options = '\n'.join( - replace_rtn(fd.decode(li.text)) - for li in div_tag.find("ul").find_all("li") - ) - - # 获取题型 - type_code = div_tag.find('div', class_='TiMu').attrs['data'] - q_type = QUESTION_TYPES.get(type_code, 'unknown') - if q_type == 'unknown': - logger.info(f"未知题型代码 -> {type_code}") - - # 构建题目数据 - question = { + for div_tag in form_tag.find_all("div",class_="singleQuesId"): # 目前来说无论是单选还是多选的题class都是这个 + q_title = replace_rtn(fd.decode(div_tag.find("div", class_="Zy_TItle").text)) + q_options = '' + for li_tag in div_tag.find("ul").find_all("li"): + q_options += replace_rtn(fd.decode(li_tag.text))+'\n' + q_options=q_options[:-1] # 去除尾部'\n' + + # 尝试使用 data 属性来判断题型 + q_type_code = div_tag.find('div',class_='TiMu').attrs['data'] + q_type = '' + # 此处可能需要完善更多题型的判断 + if q_type_code == '0': + q_type = 'single' + elif q_type_code == '1': + q_type = 'multiple' + elif q_type_code == '2': + q_type = 'completion' + elif q_type_code == '3': + q_type = 'judgement' + else: + logger.info("未知题型代码 -> "+q_type_code) + q_type = 'unknown' # 避免出现未定义取值错误 + + form_data["questions"].append({ 'id': div_tag.attrs["data"], - 'title': title, - 'options': options, - 'type': q_type, - 'answerField': { - f'answer{div_tag.attrs["data"]}': '', - f'answertype{div_tag.attrs["data"]}': type_code + 'title':q_title, # 题目 + 'options':q_options, # 选项 可提供给题库作为辅助 + 'type': q_type, # 题型 可提供给题库作为辅助 + 'answerField':{ + 'answer'+div_tag.attrs["data"]:'', # 答案填入处 + 'answertype'+div_tag.attrs["data"]:q_type_code } - } - form_data['questions'].append(question) - - # 添加答题ID列表 - form_data['answerwqbid'] = ','.join(q['id'] for q in form_data['questions']) + ',' - + }) + # 处理答题信息 + form_data['answerwqbid'] = ",".join([q['id'] for q in form_data['questions']])+"," return form_data -# 辅助函数 -def _should_process_read_task(card: dict) -> bool: - """判断是否需要处理阅读任务""" - return (card.get('type') == "read" and - not card.get('property', {}).get('read', False)) - -def _parse_read_task(card: dict) -> dict: - """解析阅读任务""" - return { - 'title': card['property']['title'], - 'type': 'read', - 'id': card['property']['id'], - 'jobid': card['jobid'], - 'jtoken': card['jtoken'], - 'mid': card['mid'], - 'otherinfo': card['otherInfo'], - 'enc': card['enc'], - 'aid': card['aid'] - } - -def _parse_video_task(card: dict) -> dict: - """解析视频任务""" - try: - return { - 'type': 'video', - 'jobid': card['jobid'], - 'name': card['property']['name'], - 'otherinfo': card['otherInfo'], - 'mid': card['mid'], - 'objectid': card['objectId'], - 'aid': card['aid'] - } - except KeyError: - logger.warning("出现转码失败视频,已跳过...") - return None - -def _parse_document_task(card: dict) -> dict: - """解析文档任务""" - return { - 'type': 'document', - 'jobid': card['jobid'], - 'otherinfo': card['otherInfo'], - 'jtoken': card['jtoken'], - 'mid': card['mid'], - 'enc': card['enc'], - 'aid': card['aid'], - 'objectid': card['property']['objectid'] - } - -def _parse_workid_task(card: dict) -> dict: - """解析章节测验任务""" - return { - 'type': 'workid', - 'jobid': card['jobid'], - 'otherinfo': card['otherInfo'], - 'mid': card['mid'], - 'enc': card['enc'], - 'aid': card['aid'] - } diff --git a/main.py b/main.py index 2630ddf9..3bc8800a 100644 --- a/main.py +++ b/main.py @@ -1,386 +1,194 @@ -#!/usr/bin/env python3 # -*- coding: utf-8 -*- - -""" -超星学习通自动化学习脚本 -功能: -- 自动完成视频/音频任务 -- 自动完成文档任务 -- 自动完成测验任务(需配置题库) -- 自动完成阅读任务 -""" - import argparse import configparser -import os -import traceback -from dataclasses import dataclass -from typing import Optional, Tuple, List, Dict, Any, Union - -from urllib3 import disable_warnings, exceptions - from api.logger import logger from api.base import Chaoxing, Account +from api.exceptions import LoginError, FormatError, JSONDecodeError,MaxRollBackError from api.answer import Tiku -from api.exceptions import LoginError, FormatError, JSONDecodeError, MaxRollBackError +from urllib3 import disable_warnings,exceptions +import os -# 关闭 SSL 警告 -disable_warnings(exceptions.InsecureRequestWarning) +# 定义全局变量,用于存储配置文件路径 +textPath = './resource/BookID.txt' -@dataclass -class Config: - """配置数据类""" - username: str - password: str - course_list: Optional[List[str]] - speed: float - tiku_config: Optional[Dict[str, Any]] +# 获取文本 -> 用于查看学习过的课程ID +def getText(): + try: + if not os.path.exists(textPath): + with open(textPath, 'x') as file: pass + return [] + with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',') + content = {int(item.strip()) for item in content if item.strip()} + return list(content) + except Exception as e: logger.error(f"获取文本失败: {e}"); return [] -class CourseManager: - """课程管理类""" +# 追加文本 -> 用于记录学习过的课程ID +def appendText(text): + if not os.path.exists(textPath): return + with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') - def __init__(self, chaoxing: Chaoxing): - self.chaoxing = chaoxing - self.all_courses: List[Dict] = [] - self.selected_courses: List[Dict] = [] - def load_courses(self) -> None: - """加载所有课程""" - self.all_courses = self.chaoxing.get_course_list() - - def select_courses(self, course_list: Optional[List[str]] = None) -> None: - """ - 选择要学习的课程 - - Args: - course_list: 课程ID列表,为None时通过交互方式选择 - """ - if not course_list: - self._interactive_select() - else: - self._filter_courses(course_list) - - if not self.selected_courses: - self.selected_courses = self.all_courses - - logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(self.selected_courses)}") - - def _interactive_select(self) -> None: - """交互式选择课程""" - print("*" * 10 + "课程列表" + "*" * 10) - for course in self.all_courses: - print(f"ID: {course['courseId']} 课程名: {course['title']}") - print("*" * 28) - - try: - course_input = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").strip() - if course_input: - course_list = [c.strip() for c in course_input.split(",")] - self._filter_courses(course_list) - except Exception as e: - raise FormatError("输入格式错误") from e +# 关闭警告 +disable_warnings(exceptions.InsecureRequestWarning) - def _filter_courses(self, course_list: List[str]) -> None: - """根据课程ID列表筛选课程""" - self.selected_courses = [ - course for course in self.all_courses - if course["courseId"] in course_list - ] +def init_config(): + parser = argparse.ArgumentParser(description='Samueli924/chaoxing') # 命令行传参 + parser.add_argument("-c", "--config", type=str, default=None, help="使用配置文件运行程序") + parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") + parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") + parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表") + parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") + args = parser.parse_args() + if args.config: + config = configparser.ConfigParser() + config.read(args.config, encoding="utf8") + return (config.get("common", "username"), + config.get("common", "password"), + str(config.get("common", "course_list")).split(",") if config.get("common", "course_list") else None, + int(config.get("common", "speed")), + config['tiku'] + ) + else: + return (args.username, args.password, args.list.split(",") if args.list else None, int(args.speed) if args.speed else 1,None) class RollBackManager: - """任务回滚管理类""" - - MAX_ROLLBACK = 3 - def __init__(self) -> None: - self.rollback_times: int = 0 - self.rollback_id: str = "" - - def add_times(self, id: str) -> None: - """ - 增加回滚次数 - - Args: - id: 任务点ID - - Raises: - MaxRollBackError: 超过最大回滚次数 - """ - if id == self.rollback_id: - if self.rollback_times >= self.MAX_ROLLBACK: - raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") - self.rollback_times += 1 - else: + self.rollback_times = 0 + self.rollback_id = "" + + def add_times(self,id:str) -> None: + if id == self.rollback_id and self.rollback_times == 3: + raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") + elif id != self.rollback_id: + # 新job self.rollback_id = id self.rollback_times = 1 + else: + self.rollback_times += 1 -def init_config() -> Config: - """ - 初始化配置 - - Returns: - Config: 配置对象 - """ - parser = argparse.ArgumentParser(description='超星学习通自动化学习工具') - parser.add_argument("-c", "--config", type=str, help="使用配置文件运行程序") - parser.add_argument("-u", "--username", type=str, help="手机号账号") - parser.add_argument("-p", "--password", type=str, help="登录密码") - parser.add_argument("-l", "--list", type=str, help="要学习的课程ID列表") - parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") - - args = parser.parse_args() - - if args.config: - return _load_config_file(args.config) - - return Config( - username=args.username, - password=args.password, - course_list=args.list.split(",") if args.list else None, - speed=min(2.0, max(1.0, args.speed)), - tiku_config=None - ) - -def _load_config_file(config_path: str) -> Config: - """ - 从配置文件加载配置 - - Args: - config_path: 配置文件路径 - - Returns: - Config: 配置对象 - """ - config = configparser.ConfigParser() - config.read(config_path, encoding="utf8") - - return Config( - username=config.get("common", "username"), - password=config.get("common", "password"), - course_list=config.get("common", "course_list", fallback="").split(",") if config.get("common", "course_list") else None, - speed=float(config.get("common", "speed")), - tiku_config=dict(config["tiku"]) if "tiku" in config and config.get("tiku", "tokens", fallback="").strip() else None - ) - -def process_course_tasks(chaoxing: Chaoxing, course: Dict, speed: float, tiku: Tiku) -> None: - """ - 处理单个课程的所有任务 - - Args: - chaoxing: Chaoxing实例 - course: 课程信息 - speed: 视频播放速度 - tiku: 题库实例 - """ - logger.info(f"开始学习课程: {course['title']}") - - # 获取课程章节 - point_list = chaoxing.get_course_point( - course["courseId"], - course["clazzId"], - course["cpi"] - ) - - rb_manager = RollBackManager() - point_index = 0 - - # 遍历章节 - while point_index < len(point_list["points"]): - point = point_list["points"][point_index] - logger.info(f'当前章节: {point["title"]}') - - try: - process_chapter(chaoxing, course, point, point_index, speed, tiku, rb_manager) - point_index += 1 - except MaxRollBackError as e: - logger.error(str(e)) - break - -def process_chapter( - chaoxing: Chaoxing, - course: Dict, - point: Dict, - point_index: int, - speed: float, - tiku: Tiku, - rb_manager: RollBackManager -) -> None: - """ - 处理单个章节的任务 - - Args: - chaoxing: Chaoxing实例 - course: 课程信息 - point: 章节信息 - point_index: 章节索引 - speed: 视频播放速度 - tiku: 题库实例 - rb_manager: 回滚管理器 - """ - jobs, job_info = chaoxing.get_job_list( - course["clazzId"], - course["courseId"], - course["cpi"], - point["id"] - ) - - # 处理未开放章节 - if job_info.get('notOpen', False): - handle_locked_chapter(point, tiku, rb_manager) - return - - # 处理章节任务 - if jobs: - for job in jobs: - process_job(chaoxing, course, point, job, job_info, speed) - -def handle_locked_chapter(point: Dict, tiku: Optional[Tiku], rb_manager: RollBackManager) -> None: - """ - 处理未开放的章节 - - Args: - point: 章节信息 - tiku: 题库实例 - rb_manager: 回滚管理器 - - Raises: - MaxRollBackError: 章节未开放且无法处理 - """ - if not tiku or tiku.DISABLE or not tiku.SUBMIT: - logger.error("章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交") - raise MaxRollBackError("章节未开放") - rb_manager.add_times(point["id"]) - -def process_job( - chaoxing: Chaoxing, - course: Dict, - point: Dict, - job: Dict, - job_info: Dict, - speed: float -) -> None: - """ - 处理单个任务点 - - Args: - chaoxing: Chaoxing实例 - course: 课程信息 - point: 章节信息 - job: 任务信息 - job_info: 任务详细信息 - speed: 视频播放速度 - """ - job_type_handlers = { - "video": handle_video_job, - "document": handle_document_job, - "workid": handle_work_job, - "read": handle_read_job - } - - handler = job_type_handlers.get(job["type"]) - if handler: - handler(chaoxing, course, point, job, job_info, speed) - -def handle_video_job( - chaoxing: Chaoxing, - course: Dict, - point: Dict, - job: Dict, - job_info: Dict, - speed: float -) -> None: - """处理视频任务""" - logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") - try: - chaoxing.study_video(course, job, job_info, speed=speed, type_="Video") - except JSONDecodeError: - logger.warning("当前任务非视频任务,正在尝试音频任务解码") - try: - chaoxing.study_video(course, job, job_info, speed=speed, type_="Audio") - except JSONDecodeError: - logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") - -def handle_document_job( - chaoxing: Chaoxing, - course: Dict, - point: Dict, - job: Dict, - job_info: Dict, - speed: float -) -> None: - """处理文档任务""" - logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") - chaoxing.study_document(course, job) - -def handle_work_job( - chaoxing: Chaoxing, - course: Dict, - point: Dict, - job: Dict, - job_info: Dict, - speed: float -) -> None: - """处理测验任务""" - logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") - chaoxing.study_work(course, job, job_info) - -def handle_read_job( - chaoxing: Chaoxing, - course: Dict, - point: Dict, - job: Dict, - job_info: Dict, - speed: float -) -> None: - """处理阅读任务""" - logger.trace(f"识别到阅读任务, 任务章节: {course['title']}") - chaoxing.study_read(course, job, job_info) -def main() -> None: - """主函数""" +if __name__ == '__main__': try: - # 初始化配置 - config = init_config() - - # 获取登录信息 - if not config.username or not config.password: - config.username = input("请输入你的手机号,按回车确认\n手机号:").strip() - config.password = input("请输入你的密码,按回车确认\n密码:").strip() - - # 初始化账号 - account = Account(config.username, config.password) - - # 初始化题库 - tiku = None - if config.tiku_config: - tiku = Tiku() - tiku.config_set(config.tiku_config) - tiku = tiku.get_tiku_from_config() - tiku.init_tiku() - - # 初始化超星API - chaoxing = Chaoxing(account=account, tiku=tiku) - - # 登录检查 - login_state = chaoxing.login() - if not login_state["status"]: - raise LoginError(login_state["msg"]) - - # 课程管理 - course_manager = CourseManager(chaoxing) - course_manager.load_courses() - course_manager.select_courses(config.course_list) - - # 处理所有课程 - for course in course_manager.selected_courses: - process_course_tasks(chaoxing, course, config.speed, tiku) - + # 避免异常的无限回滚 + RB = RollBackManager() + # 初始化登录信息 + username, password, course_list, speed,tiku_config= init_config() + # 规范化播放速度的输入值 + speed = min(2.0, max(1.0, speed)) + if (not username) or (not password): + username = input("请输入你的手机号,按回车确认\n手机号:") + password = input("请输入你的密码,按回车确认\n密码:") + account = Account(username, password) + # 设置题库 + tiku = Tiku() + tiku.config_set(tiku_config) # 载入配置 + tiku = tiku.get_tiku_from_config() # 载入题库 + tiku.init_tiku() # 初始化题库 + # 实例化超星API + chaoxing = Chaoxing(account=account,tiku=tiku) + # 检查当前登录状态,并检查账号密码 + _login_state = chaoxing.login() + if not _login_state["status"]: + raise LoginError(_login_state["msg"]) + # 获取所有的课程列表 + all_course = chaoxing.get_course_list() + course_task = [] + # 手动输入要学习的课程ID列表 + if not course_list: + print("*" * 10 + "课程列表" + "*" * 10) + for course in all_course: + print(f"ID: {course['courseId']} 课程名: {course['title']}") + print("*" * 28) + try: + course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",") + except Exception as e: + raise FormatError("输入格式错误") from e + # 筛选需要学习的课程 + for course in all_course: + if course["courseId"] in course_list: + course_task.append(course) + if not course_task: + course_task = all_course + # 开始遍历要学习的课程列表 + logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}") + for course in course_task: + logger.info(f"开始学习课程: {course['title']}") + # 获取当前课程的所有章节 + point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) + + # 为了支持课程任务回滚,采用下标方式遍历任务点 + __point_index = 0 + while __point_index < len(point_list["points"]): + point = point_list["points"][__point_index] + logger.info(f'当前章节: {point["title"]}') + # 获取当前章节的所有任务点 + jobs = [] + job_info = None + jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"]) + + bookID = job_info["knowledgeid"] # 获取视频ID + + # 发现未开放章节,尝试回滚上一个任务重新完成一次 + try: + if job_info.get('notOpen',False): + __point_index -= 1 # 默认第一个任务总是开放的 + # 针对题库启用情况 + if not tiku or tiku.DISABLE or not tiku.SUBMIT: + # 未启用题库或未开启题库提交,章节检测未完成会导致无法开始下一章,直接退出 + logger.error(f"章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交") + break + RB.add_times(point["id"]) + continue + except MaxRollBackError as e: + logger.error("回滚次数已达3次,请手动检查学习通任务点完成情况") + # 跳过该课程,继续下一课程 + break + + + # 可能存在章节无任何内容的情况 + if not jobs: + __point_index += 1 + continue + # 遍历所有任务点 + for job in jobs: + # 视频任务 + if job["type"] == "video": + + TextBookID = getText() # 获取学习过的课程ID + if TextBookID.count(bookID) > 0: + logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中,跳过") # 如果已经学习过该课程,则跳过 + break # 如果已经学习过该课程,则跳过 + appendText(bookID) # 记录正在学习的课程ID + + logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + # 超星的接口没有返回当前任务是否为Audio音频任务 + isAudio = False + try: + chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video") + except JSONDecodeError as e: + logger.warning("当前任务非视频任务,正在尝试音频任务解码") + isAudio = True + if isAudio: + try: + chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio") + except JSONDecodeError as e: + logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") + # 文档任务 + elif job["type"] == "document": + logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + chaoxing.study_document(course, job) + # 测验任务 + elif job["type"] == "workid": + logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") + chaoxing.study_work(course, job,job_info) + # 阅读任务 + elif job["type"] == "read": + logger.trace(f"识别到阅读任务, 任务章节: {course['title']}") + chaoxing.strdy_read(course, job,job_info) + __point_index += 1 logger.info("所有课程学习任务已完成") - - except Exception as e: + except BaseException as e: + import traceback logger.error(f"错误: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) - raise - -if __name__ == '__main__': - main() \ No newline at end of file + raise e \ No newline at end of file