diff --git a/.gitignore b/.gitignore index fea5fc26..9a6cee37 100644 --- a/.gitignore +++ b/.gitignore @@ -139,9 +139,11 @@ cookies.txt .config.ini config.ini chaoxing.log +config*.ini .chaoxing.log ./config.ini ./chaoxing.log ./cookies.txt .idea/ -cache.json +.vscode/ +cache.json \ No newline at end of file diff --git a/api/answer.py b/api/answer.py index 330f6439..76acddad 100644 --- a/api/answer.py +++ b/api/answer.py @@ -68,7 +68,7 @@ def token(self,value): self._token = value def init_tiku(self): - # 仅用于题库初始化,应该在题库载入后作初始化调用,随后才可以使用题库 + # 仅用于题库初始化, 应该在题库载入后作初始化调用, 随后才可以使用题库 # 尝试根据配置文件设置提交模式 if not self._conf: self.config_set(self._get_conf()) @@ -79,7 +79,7 @@ def init_tiku(self): self._init_tiku() def _init_tiku(self): - # 仅用于题库初始化,例如配置token,交由自定义题库完成 + # 仅用于题库初始化, 例如配置token, 交由自定义题库完成 pass def config_set(self,config): @@ -87,14 +87,14 @@ def config_set(self,config): def _get_conf(self): """ - 从默认配置文件查询配置,如果未能查到,停用题库 + 从默认配置文件查询配置, 如果未能查到, 停用题库 """ try: config = configparser.ConfigParser() config.read(self.CONFIG_PATH, encoding="utf8") return config['tiku'] except KeyError or FileNotFoundError: - logger.info("未找到tiku配置,已忽略题库功能") + logger.info("未找到tiku配置, 已忽略题库功能") self.DISABLE = True return None @@ -102,9 +102,11 @@ def query(self,q_info:dict): if self.DISABLE: return None - # 预处理,去除【单选题】这样与标题无关的字段 + # 预处理, 去除【单选题】这样与标题无关的字段 # 此处需要改进!!! + logger.debug(f"原始标题:{q_info['title']}") q_info['title'] = q_info['title'][6:] # 暂时直接用裁切解决 + logger.debug(f"处理后标题:{q_info['title']}") # 先过缓存 cache_dao = CacheDAO() @@ -123,13 +125,13 @@ def query(self,q_info:dict): return None def _query(self,q_info:dict): """ - 查询接口,交由自定义题库实现 + 查询接口, 交由自定义题库实现 """ pass def get_tiku_from_config(self): """ - 从配置文件加载题库,这个配置可以是用户提供,可以是默认配置文件 + 从配置文件加载题库, 这个配置可以是用户提供, 可以是默认配置文件 """ if not self._conf: # 尝试从默认配置文件加载 @@ -141,7 +143,7 @@ def get_tiku_from_config(self): if not cls_name: raise KeyError except KeyError: - logger.error("未找到题库配置,已忽略题库功能") + logger.error("未找到题库配置, 已忽略题库功能") return self new_cls = globals()[cls_name]() new_cls.config_set(self._conf) @@ -149,7 +151,7 @@ def get_tiku_from_config(self): def jugement_select(self,answer:str) -> bool: """ - 这是一个专用的方法,要求配置维护两个选项列表,一份用于正确选项,一份用于错误选项,以应对题库对判断题答案响应的各种可能的情况 + 这是一个专用的方法, 要求配置维护两个选项列表, 一份用于正确选项, 一份用于错误选项, 以应对题库对判断题答案响应的各种可能的情况 它的作用是将获取到的答案answer与可能的选项列对比并返回对应的布尔值 """ if self.DISABLE: @@ -163,15 +165,15 @@ def jugement_select(self,answer:str) -> bool: elif answer in false_list: return False else: - # 无法判断,随机选择 - logger.error(f'无法判断答案 -> {answer} 对应的是正确还是错误,请自行判断并加入配置文件重启脚本,本次将会随机选择选项') + # 无法判断, 随机选择 + logger.error(f'无法判断答案 -> {answer} 对应的是正确还是错误, 请自行判断并加入配置文件重启脚本, 本次将会随机选择选项') return random.choice([True,False]) def get_submit_params(self): """ - 这是一个专用方法,用于根据当前设置的提交模式,响应对应的答题提交API中的pyFlag值 + 这是一个专用方法, 用于根据当前设置的提交模式, 响应对应的答题提交API中的pyFlag值 """ - # 留空直接提交,1保存但不提交 + # 留空直接提交, 1保存但不提交 if self.SUBMIT: return "" else: @@ -187,7 +189,7 @@ def __init__(self) -> None: self.api = 'https://tk.enncy.cn/query' self._token = None self._token_index = 0 # token队列计数器 - self._times = 100 # 查询次数剩余,初始化为100,查询后校对修正 + self._times = 100 # 查询次数剩余, 初始化为100, 查询后校对修正 def _query(self,q_info:dict): res = requests.get( @@ -201,14 +203,14 @@ def _query(self,q_info:dict): if res.status_code == 200: res_json = res.json() if not res_json['code']: - # 如果是因为TOKEN次数到期,则更换token + # 如果是因为TOKEN次数到期, 则更换token if self._times == 0 or '次数不足' in res_json['data']['answer']: - logger.info(f'TOKEN查询次数不足,将会更换并重新搜题') + logger.info(f'TOKEN查询次数不足, 将会更换并重新搜题') self._token_index += 1 self.load_token() # 重新查询 return self._query(q_info) - logger.error(f'{self.name}查询失败:\n剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n消息:{res_json["message"]}') + logger.error(f'{self.name}查询失败:\n\t剩余查询数{res_json["data"].get("times",f"{self._times}(仅参考)")}:\n\t消息:{res_json["message"]}') return None self._times = res_json["data"].get("times",self._times) return res_json['data']['answer'].strip() @@ -220,8 +222,8 @@ def load_token(self): token_list = self._conf['tokens'].split(',') if self._token_index == len(token_list): # TOKEN 用完 - logger.error('TOKEN用完,请自行更换再重启脚本') - raise Exception(f'{self.name} TOKEN 已用完,请更换') + logger.error('TOKEN用完, 请自行更换再重启脚本') + raise Exception(f'{self.name} TOKEN 已用完, 请更换') self._token = token_list[self._token_index] def _init_tiku(self): @@ -260,7 +262,7 @@ def _query(self, q_info: dict): if res.status_code == 200: res_json = res.json() if bool(res_json['plat']): - logger.error("查询失败,返回:" + res.text) + logger.error("查询失败, 返回:" + res.text) return None sep = "\n" return sep.join(res_json['answer']['allAnswer'][0]).strip() diff --git a/api/base.py b/api/base.py index b942bbc9..dbb1586c 100644 --- a/api/base.py +++ b/api/base.py @@ -11,14 +11,16 @@ from api.cookies import save_cookies, use_cookies from api.process import show_progress from api.config import GlobalConst as gc -from api.decode import (decode_course_list, - decode_course_point, - decode_course_card, - decode_course_folder, - decode_questions_info - ) +from api.decode import ( + decode_course_list, + decode_course_point, + decode_course_card, + decode_course_folder, + decode_questions_info, +) from api.answer import * + def get_timestamp(): return str(int(time.time() * 1000)) @@ -30,8 +32,8 @@ def get_random_seconds(): def init_session(isVideo: bool = False, isAudio: bool = False): _session = requests.session() _session.verify = False - _session.mount('http://', HTTPAdapter(max_retries=3)) - _session.mount('https://', HTTPAdapter(max_retries=3)) + _session.mount("http://", HTTPAdapter(max_retries=3)) + _session.mount("https://", HTTPAdapter(max_retries=3)) if isVideo: _session.headers = gc.VIDEO_HEADERS elif isAudio: @@ -47,13 +49,14 @@ class Account: password = None last_login = None isSuccess = None + def __init__(self, _username, _password): self.username = _username self.password = _password class Chaoxing: - def __init__(self, account: Account = None,tiku:Tiku=None): + def __init__(self, account: Account = None, tiku: Tiku = None): self.account = account self.cipher = AESCipher() self.tiku = tiku @@ -62,16 +65,17 @@ def login(self): _session = requests.session() _session.verify = False _url = "https://passport2.chaoxing.com/fanyalogin" - _data = {"fid": "-1", - "uname": self.cipher.encrypt(self.account.username), - "password": self.cipher.encrypt(self.account.password), - "refer": "https%3A%2F%2Fi.chaoxing.com", - "t": True, - "forbidotherlogin": 0, - "validate": "", - "doubleFactorLogin": 0, - "independentId": 0 - } + _data = { + "fid": "-1", + "uname": self.cipher.encrypt(self.account.username), + "password": self.cipher.encrypt(self.account.password), + "refer": "https%3A%2F%2Fi.chaoxing.com", + "t": True, + "forbidotherlogin": 0, + "validate": "", + "doubleFactorLogin": 0, + "independentId": 0, + } logger.trace("正在尝试登录...") resp = _session.post(_url, headers=gc.HEADERS, data=_data) if resp and resp.json()["status"] == True: @@ -92,21 +96,16 @@ def get_uid(self): def get_course_list(self): _session = init_session() _url = "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/courselistdata" - _data = { - "courseType": 1, - "courseFolderId": 0, - "query": "", - "superstarClass": 0 - } + _data = {"courseType": 1, "courseFolderId": 0, "query": "", "superstarClass": 0} logger.trace("正在读取所有的课程列表...") - # 接口突然抽风,增加headers + # 接口突然抽风, 增加headers _headers = { "Host": "mooc2-ans.chaoxing.com", - "sec-ch-ua-platform": "\"Windows\"", + "sec-ch-ua-platform": '"Windows"', "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", "Accept": "text/html, */*; q=0.01", - "sec-ch-ua": "\"Microsoft Edge\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"", + "sec-ch-ua": '"Microsoft Edge";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "sec-ch-ua-mobile": "?0", "Origin": "https://mooc2-ans.chaoxing.com", @@ -114,9 +113,9 @@ def get_course_list(self): "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", "Referer": "https://mooc2-ans.chaoxing.com/mooc2-ans/visit/interaction?moocDomain=https://mooc1-1.chaoxing.com/mooc-ans", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", } - _resp = _session.post(_url,headers=_headers,data=_data) + _resp = _session.post(_url, headers=_headers, data=_data) # logger.trace(f"原始课程列表内容:\n{_resp.text}") logger.info("课程列表读取完毕...") course_list = decode_course_list(_resp.text) @@ -129,7 +128,7 @@ def get_course_list(self): "courseType": 1, "courseFolderId": folder["id"], "query": "", - "superstarClass": 0 + "superstarClass": 0, } _resp = _session.post(_url, data=_data) course_list += decode_course_list(_resp.text) @@ -148,13 +147,17 @@ def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): _session = init_session() job_list = [] job_info = {} - for _possible_num in ["0", "1","2"]: # 学习界面任务卡片数,很少有3个的,但是对于章节解锁任务点少一个都不行,可以从API /mooc-ans/mycourse/studentstudyAjax获取值,或者干脆直接加,但二者都会造成额外的请求 + for _possible_num in [ + "0", + "1", + "2", + ]: # 学习界面任务卡片数, 很少有3个的, 但是对于章节解锁任务点少一个都不行, 可以从API /mooc-ans/mycourse/studentstudyAjax获取值, 或者干脆直接加, 但二者都会造成额外的请求 _url = f"https://mooc1.chaoxing.com/mooc-ans/knowledge/cards?clazzid={_clazzid}&courseid={_courseid}&knowledgeid={_knowledgeid}&num={_possible_num}&ut=s&cpi={_cpi}&v=20160407-3&mooc2=1" logger.trace("开始读取章节所有任务点...") _resp = _session.get(_url) _job_list, _job_info = decode_course_card(_resp.text) - if _job_info.get('notOpen',False): - # 直接返回,节省一次请求 + if _job_info.get("notOpen", False): + # 直接返回, 节省一次请求 logger.info("该章节未开放") return [], _job_info job_list += _job_list @@ -167,47 +170,61 @@ def get_job_list(self, _clazzid, _courseid, _cpi, _knowledgeid): def get_enc(self, clazzId, jobid, objectId, playingTime, duration, userid): return md5( - f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]" - .encode()).hexdigest() - - def video_progress_log(self, _session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type: str = "Video"): - if "courseId" in _job['otherinfo']: + f"[{clazzId}][{userid}][{jobid}][{objectId}][{playingTime * 1000}][d_yHJ!$pdA~5][{duration * 1000}][0_{duration}]".encode() + ).hexdigest() + + def video_progress_log( + self, + _session, + _course, + _job, + _job_info, + _dtoken, + _duration, + _playingTime, + _type: str = "Video", + ): + if "courseId" in _job["otherinfo"]: _mid_text = f"otherInfo={_job['otherinfo']}&" else: _mid_text = f"otherInfo={_job['otherinfo']}&courseId={_course['courseId']}&" _success = False for _possible_rt in ["0.9", "1"]: - _url = (f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/" - f"{_course['cpi']}/" - f"{_dtoken}?" - f"clazzId={_course['clazzId']}&" - f"playingTime={_playingTime}&" - f"duration={_duration}&" - f"clipTime=0_{_duration}&" - f"objectId={_job['objectid']}&" - f"{_mid_text}" - f"jobid={_job['jobid']}&" - f"userid={self.get_uid()}&" - f"isdrag=3&" - f"view=pc&" - f"enc={self.get_enc(_course['clazzId'], _job['jobid'], _job['objectid'], _playingTime, _duration, self.get_uid())}&" - f"rt={_possible_rt}&" - f"dtype={_type}&" - f"_t={get_timestamp()}") + _url = ( + f"https://mooc1.chaoxing.com/mooc-ans/multimedia/log/a/" + f"{_course['cpi']}/" + f"{_dtoken}?" + f"clazzId={_course['clazzId']}&" + f"playingTime={_playingTime}&" + f"duration={_duration}&" + f"clipTime=0_{_duration}&" + f"objectId={_job['objectid']}&" + f"{_mid_text}" + f"jobid={_job['jobid']}&" + f"userid={self.get_uid()}&" + f"isdrag=3&" + f"view=pc&" + f"enc={self.get_enc(_course['clazzId'], _job['jobid'], _job['objectid'], _playingTime, _duration, self.get_uid())}&" + f"rt={_possible_rt}&" + f"dtype={_type}&" + f"_t={get_timestamp()}" + ) resp = _session.get(_url) if resp.status_code == 200: _success = True - break # 如果返回为200正常,则跳出循环 + break # 如果返回为200正常, 则跳出循环 elif resp.status_code == 403: - continue # 如果出现403无权限报错,则继续尝试不同的rt参数 + continue # 如果出现403无权限报错, 则继续尝试不同的rt参数 if _success: return resp.json() else: - # 若出现两个rt参数都返回403的情况,则跳过当前任务 - logger.warning("出现403报错,尝试修复无效,正在跳过当前任务点...") + # 若出现两个rt参数都返回403的情况, 则跳过当前任务 + logger.warning("出现403报错, 尝试修复无效, 正在跳过当前任务点...") return False - def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video"): + def study_video( + self, _course, _job, _job_info, _speed: float = 1.0, _type: str = "Video" + ): if _type == "Video": _session = init_session(isVideo=True) else: @@ -227,7 +244,16 @@ def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str while not _isFinished: if _isFinished: _playingTime = _duration - _isPassed = self.video_progress_log(_session, _course, _job, _job_info, _dtoken, _duration, _playingTime, _type) + _isPassed = self.video_progress_log( + _session, + _course, + _job, + _job_info, + _dtoken, + _duration, + _playingTime, + _type, + ) if not _isPassed or (_isPassed and _isPassed["isPassed"]): break _wait_time = get_random_seconds() @@ -235,192 +261,269 @@ def study_video(self, _course, _job, _job_info, _speed: float = 1.0, _type: str _wait_time = int(_duration) - _playingTime _isFinished = True # 播放进度条 - show_progress(_job['name'], _playingTime, _wait_time, _duration, _speed) + show_progress(_job["name"], _playingTime, _wait_time, _duration, _speed) _playingTime += _wait_time print("\r", end="", flush=True) logger.info(f"任务完成: {_job['name']}") def study_document(self, _course, _job): + """ + Study a document in Chaoxing platform. + + This method makes a GET request to fetch document information for a given course and job. + + Args: + _course (dict): Dictionary containing course information with keys: + - courseId: ID of the course + - clazzId: ID of the class + _job (dict): Dictionary containing job information with keys: + - jobid: ID of the job + - otherinfo: String containing node information + - jtoken: Authentication token for the job + + Returns: + requests.Response: Response object from the GET request + + Note: + This method requires the following helper functions: + - init_session(): To initialize a new session + - get_timestamp(): To get current timestamp + - re module for regular expression matching + """ _session = init_session() _url = f"https://mooc1.chaoxing.com/ananas/job/document?jobid={_job['jobid']}&knowledgeid={re.findall(r'nodeId_(.*?)-', _job['otherinfo'])[0]}&courseid={_course['courseId']}&clazzid={_course['clazzId']}&jtoken={_job['jtoken']}&_dc={get_timestamp()}" _resp = _session.get(_url) - def study_work(self, _course, _job,_job_info) -> None: + def study_work(self, _course, _job, _job_info) -> None: if self.tiku.DISABLE or not self.tiku: return None - _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码,帮助修复#391错误 + _ORIGIN_HTML_CONTENT = "" # 用于配合输出网页源码, 帮助修复#391错误 - def random_answer(options:str) -> str: - answer = '' + def random_answer(options: str) -> str: + answer = "" if not options: return answer - - if q['type'] == "multiple": + + if q["type"] == "multiple": + logger.debug(f"当前选项列表[cut前] -> {options}") _op_list = multi_cut(options) - for i in range(random.choices([2,3,4],weights=[0.1,0.5,0.4],k=1)[0]): # 此处表示随机多选答案几率:2个 10%,3个 50% ,4个 40% + logger.debug(f"当前选项列表[cut后] -> {_op_list}") + + if not _op_list: + logger.error( + "选项为空, 未能正确提取题目选项信息! 请反馈并提供以上信息" + ) + return answer + + for i in range( + random.choices([2, 3, 4], weights=[0.1, 0.5, 0.4], k=1)[0] + ): # 此处表示随机多选答案几率:2个 10%, 3个 50%, 4个 40% _choice = random.choice(_op_list) _op_list.remove(_choice) - answer+=_choice[:1] # 取首字为答案,例如A或B - # 对答案进行排序,否则会提交失败 + answer += _choice[:1] # 取首字为答案, 例如A或B + # 对答案进行排序, 否则会提交失败 answer = "".join(sorted(answer)) - elif q['type'] == "single": - answer = random.choice(options.split('\n'))[:1] # 取首字为答案,例如A或B + elif q["type"] == "single": + answer = random.choice(options.split("\n"))[ + :1 + ] # 取首字为答案, 例如A或B # 判断题处理 - elif q['type'] == "judgement": + elif q["type"] == "judgement": # answer = self.tiku.jugement_select(_answer) - answer = "true" if random.choice([True,False]) else "false" - logger.info(f'随机选择 -> {answer}') + answer = "true" if random.choice([True, False]) else "false" + logger.info(f"随机选择 -> {answer}") return answer - - def multi_cut(answer:str) -> list[str]: - cut_char = [',',',','|','\n','\r','\t','#','*','-','_','+','@','~','/','\\','.','&',' '] # 多选答案切割符 + + def multi_cut(answer: str) -> list[str]: + """ + 将多选题答案字符串按特定字符进行切割, 并返回切割后的答案列表. + + 参数: + answer (str): 多选题答案字符串. + + 返回: + list[str]: 切割后的答案列表, 如果无法切割, 则返回默认的选项列表 ['A', 'B', 'C', 'D']. + + 注意: + 如果无法从网页中提取题目信息, 将记录警告日志并返回默认选项列表. + """ + # cut_char = [',',',','|','\n','\r','\t','#','*','-','_','+','@','~','/','\\','.','&',' '] # 多选答案切割符 + # ',' 在常规被正确划分的, 选项中出现, 导致 multi_cut 无法正确划分选项 #391 + # IndexError: Cannot choose from an empty sequence #391 + # 同时为了避免没有考虑到的 case, 应该先按照 '\n' 匹配, 匹配不到再按照其他字符匹配 + cut_char = [ + "\n", + ",", + ",", + "|", + "\r", + "\t", + "#", + "*", + "-", + "_", + "+", + "@", + "~", + "/", + "\\", + ".", + "&", + " ", + "、", + ] # 多选答案切割符 res = [] for char in cut_char: - res = answer.split(char) - if len(res)>1: + res = [ + opt for opt in answer.split(char) if opt.strip() + ] # Filter empty strings + if len(res) > 1: return res - logger.warning(f"未能从网页中提取题目信息,以下为相关信息:\n{answer}\n\n{_ORIGIN_HTML_CONTENT}\n") # 尝试输出网页内容和选项信息 - logger.warning("未能正确提取题目选项信息!请反馈并提供以上信息。") - return ['A','B','C','D'] # 默认多选题为4个选项 - + logger.warning( + f"未能从网页中提取题目信息, 以下为相关信息:\n\t{answer}\n\n{_ORIGIN_HTML_CONTENT}\n" + ) # 尝试输出网页内容和选项信息 + logger.warning("未能正确提取题目选项信息! 请反馈并提供以上信息") + return ["A", "B", "C", "D"] # 默认多选题为4个选项 - # 学习通这里根据参数差异能重定向至两个不同接口,需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle + # 学习通这里根据参数差异能重定向至两个不同接口, 需要定向至https://mooc1.chaoxing.com/mooc-ans/workHandle/handle _session = init_session() - headers={ + headers = { "Host": "mooc1.chaoxing.com", - "sec-ch-ua": "\"Microsoft Edge\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"", + "sec-ch-ua": '"Microsoft Edge";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', "sec-ch-ua-mobile": "?0", - "sec-ch-ua-platform": "\"Windows\"", + "sec-ch-ua-platform": '"Windows"', "Upgrade-Insecure-Requests": "1", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "navigate", "Sec-Fetch-Dest": "iframe", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", } cookies = _session.cookies.get_dict() - - _url = "https://mooc1.chaoxing.com/mooc-ans/api/work" + _url = "https://mooc1.chaoxing.com/mooc-ans/api/work" _resp = requests.get( _url, headers=headers, cookies=cookies, verify=False, - params = { + params={ "api": "1", - "workId": _job['jobid'].replace("work-",""), - "jobid": _job['jobid'], - "originJobId": _job['jobid'], + "workId": _job["jobid"].replace("work-", ""), + "jobid": _job["jobid"], + "originJobId": _job["jobid"], "needRedirect": "true", "skipHeader": "true", - "knowledgeid": str(_job_info['knowledgeid']), - 'ktoken': _job_info['ktoken'], - "cpi": _job_info['cpi'], + "knowledgeid": str(_job_info["knowledgeid"]), + "ktoken": _job_info["ktoken"], + "cpi": _job_info["cpi"], "ut": "s", - "clazzId": _course['clazzId'], + "clazzId": _course["clazzId"], "type": "", - "enc": _job['enc'], + "enc": _job["enc"], "mooc2": "1", - "courseid": _course['courseId'] - } + "courseid": _course["courseId"], + }, ) - _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码,帮助修复#391错误 - questions = decode_questions_info(_resp.text) # 加载题目信息 + _ORIGIN_HTML_CONTENT = _resp.text # 用于配合输出网页源码, 帮助修复#391错误 + questions = decode_questions_info(_resp.text) # 加载题目信息 # 搜题 - for q in questions['questions']: + for q in questions["questions"]: + logger.debug(f"当前题目信息 -> {q}") res = self.tiku.query(q) - answer = '' + answer = "" if not res: # 随机答题 - answer = random_answer(q['options']) + answer = random_answer(q["options"]) else: # 根据响应结果选择答案 - options_list = multi_cut(q['options']) - if q['type'] == "multiple": + options_list = multi_cut(q["options"]) + if q["type"] == "multiple": # 多选处理 for _a in multi_cut(res): for o in options_list: - if _a.upper() in o: # 题库返回的答案可能包含选项,如A,B,C,全部转成大写与学习通一致 + if ( + _a.upper() in o + ): # 题库返回的答案可能包含选项, 如A, B, C, 全部转成大写与学习通一致 answer += o[:1] - # 对答案进行排序,否则会提交失败 + # 对答案进行排序, 否则会提交失败 answer = "".join(sorted(answer)) - elif q['type'] == 'judgement': - answer = 'true' if self.tiku.jugement_select(res) else 'false' + elif q["type"] == "judgement": + answer = "true" if self.tiku.jugement_select(res) else "false" else: for o in options_list: if res in o: answer = o[:1] break - # 如果未能匹配,依然随机答题 - answer = answer if answer else random_answer(q['options']) + # 如果未能匹配, 依然随机答题 + logger.info(f"找到答案但答案未能匹配 -> {res}\t随机选择答案") + answer = answer if answer else random_answer(q["options"]) # 填充答案 - q['answerField'][f'answer{q["id"]}'] = answer + q["answerField"][f'answer{q["id"]}'] = answer logger.info(f'{q["title"]} 填写答案为 {answer}') - + # 提交模式 现在与题库绑定 - questions['pyFlag'] = self.tiku.get_submit_params() + questions["pyFlag"] = self.tiku.get_submit_params() # 组建提交表单 for q in questions["questions"]: - questions.update({ - f'answer{q["id"]}':q['answerField'][f'answer{q["id"]}'], - f'answertype{q["id"]}':q['answerField'][f'answertype{q["id"]}'] - }) - + questions.update( + { + f'answer{q["id"]}': q["answerField"][f'answer{q["id"]}'], + f'answertype{q["id"]}': q["answerField"][f'answertype{q["id"]}'], + } + ) del questions["questions"] res = _session.post( - 'https://mooc1.chaoxing.com/mooc-ans/work/addStudentWorkNew', + "https://mooc1.chaoxing.com/mooc-ans/work/addStudentWorkNew", data=questions, - headers= { + headers={ "Host": "mooc1.chaoxing.com", - "sec-ch-ua-platform": "\"Windows\"", + "sec-ch-ua-platform": '"Windows"', "X-Requested-With": "XMLHttpRequest", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36 Edg/129.0.0.0", "Accept": "application/json, text/javascript, */*; q=0.01", - "sec-ch-ua": "\"Microsoft Edge\";v=\"129\", \"Not=A?Brand\";v=\"8\", \"Chromium\";v=\"129\"", + "sec-ch-ua": '"Microsoft Edge";v="129", "Not=A?Brand";v="8", "Chromium";v="129"', "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8", "sec-ch-ua-mobile": "?0", "Origin": "https://mooc1.chaoxing.com", "Sec-Fetch-Site": "same-origin", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty", - #"Referer": "https://mooc1.chaoxing.com/mooc-ans/work/doHomeWorkNew?courseId=246831735&workAnswerId=52680423&workId=37778125&api=1&knowledgeid=913820156&classId=107515845&oldWorkId=07647c38d8de4c648a9277c5bed7075a&jobid=work-07647c38d8de4c648a9277c5bed7075a&type=&isphone=false&submit=false&enc=1d826aab06d44a1198fc983ed3d243b1&cpi=338350298&mooc2=1&skipHeader=true&originJobId=work-07647c38d8de4c648a9277c5bed7075a", - "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5" - } + # "Referer": "https://mooc1.chaoxing.com/mooc-ans/work/doHomeWorkNew?courseId=246831735&workAnswerId=52680423&workId=37778125&api=1&knowledgeid=913820156&classId=107515845&oldWorkId=07647c38d8de4c648a9277c5bed7075a&jobid=work-07647c38d8de4c648a9277c5bed7075a&type=&isphone=false&submit=false&enc=1d826aab06d44a1198fc983ed3d243b1&cpi=338350298&mooc2=1&skipHeader=true&originJobId=work-07647c38d8de4c648a9277c5bed7075a", + "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6,ja;q=0.5", + }, ) if res.status_code == 200: res_json = res.json() - if res_json['status']: + if res_json["status"]: logger.info(f'提交答题成功 -> {res_json["msg"]}') else: logger.error(f'提交答题失败 -> {res_json["msg"]}') else: logger.error(f"提交答题失败 -> {res.text}") - def strdy_read(self, _course, _job,_job_info) -> None: + def strdy_read(self, _course, _job, _job_info) -> None: """ - 阅读任务学习,仅完成任务点,并不增长时长 + 阅读任务学习, 仅完成任务点, 并不增长时长 """ _session = init_session() _resp = _session.get( url="https://mooc1.chaoxing.com/ananas/job/readv2", params={ - 'jobid': _job['jobid'], - 'knowledgeid':_job_info['knowledgeid'], - 'jtoken': _job['jtoken'], - 'courseid': _course['courseId'], - 'clazzid': _course['clazzId'] - } + "jobid": _job["jobid"], + "knowledgeid": _job_info["knowledgeid"], + "jtoken": _job["jtoken"], + "courseid": _course["courseId"], + "clazzid": _course["clazzId"], + }, ) if _resp.status_code != 200: logger.error(f"阅读任务学习失败 -> [{_resp.status_code}]{_resp.text}") else: _resp_json = _resp.json() logger.info(f"阅读任务学习 -> {_resp_json['msg']}") - - diff --git a/api/cipher.py b/api/cipher.py index e06e7cdd..efe31acc 100644 --- a/api/cipher.py +++ b/api/cipher.py @@ -6,7 +6,7 @@ def pkcs7_unpadding(string): - return string[0:-ord(string[-1])] + return string[0 : -ord(string[-1])] def pkcs7_padding(s, block_size=16): @@ -29,15 +29,15 @@ def split_to_data_blocks(byte_str, block_size=16): return blocks -class AESCipher(): +class AESCipher: def __init__(self): self.key = str(gc.AESKey).encode("utf8") self.iv = str(gc.AESKey).encode("utf8") def encrypt(self, plaintext: str): - ciphertext = b'' + ciphertext = b"" cbc = pyaes.AESModeOfOperationCBC(self.key, self.iv) - plaintext = plaintext.encode('utf-8') + plaintext = plaintext.encode("utf-8") blocks = split_to_data_blocks(pkcs7_padding(plaintext)) for b in blocks: ciphertext = ciphertext + cbc.encrypt(b) @@ -51,4 +51,4 @@ def encrypt(self, plaintext: str): # ptext = b"" # for b in split_to_data_blocks(ciphertext): # ptext = ptext + cbc.decrypt(b) - # return pkcs7_unpadding(ptext.decode()) \ No newline at end of file + # return pkcs7_unpadding(ptext.decode()) diff --git a/api/config.py b/api/config.py index bee24b85..bc0a9569 100644 --- a/api/config.py +++ b/api/config.py @@ -3,17 +3,17 @@ class GlobalConst: AESKey = "u2oh6Vu^HWe4_AES" HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", - "Sec-Ch-Ua": '"Chromium";v="118", "Google Chrome";v="118", "Not=A?Brand";v="99"' + "Sec-Ch-Ua": '"Chromium";v="118", "Google Chrome";v="118", "Not=A?Brand";v="99"', } COOKIES_PATH = "cookies.txt" VIDEO_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Referer": "https://mooc1.chaoxing.com/ananas/modules/video/index.html?v=2023-1110-1610", - "Host": "mooc1.chaoxing.com" + "Host": "mooc1.chaoxing.com", } AUDIO_HEADERS = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36", "Referer": "https://mooc1.chaoxing.com/ananas/modules/audio/index_new.html?v=2023-0428-1705", - "Host": "mooc1.chaoxing.com" + "Host": "mooc1.chaoxing.com", } - THRESHOLD = 3 \ No newline at end of file + THRESHOLD = 3 diff --git a/api/cookies.py b/api/cookies.py index 1158d8ec..ce99d990 100644 --- a/api/cookies.py +++ b/api/cookies.py @@ -5,12 +5,12 @@ def save_cookies(_session): - with open(gc.COOKIES_PATH, 'wb') as f: + with open(gc.COOKIES_PATH, "wb") as f: pickle.dump(_session.cookies, f) def use_cookies(): if os.path.exists(gc.COOKIES_PATH): - with open(gc.COOKIES_PATH, 'rb') as f: + with open(gc.COOKIES_PATH, "rb") as f: _cookies = pickle.load(f) - return _cookies \ No newline at end of file + return _cookies diff --git a/api/cxsecret_font.py b/api/cxsecret_font.py index f44fb5b7..d66fb856 100644 --- a/api/cxsecret_font.py +++ b/api/cxsecret_font.py @@ -1,7 +1,7 @@ ## # @Author: SocialSisterYi # @Reference: https://github.com/SocialSisterYi/xuexiaoyi-to-xuexitong-tampermonkey-proxy -# +# import base64 import hashlib @@ -24,6 +24,7 @@ class FontHashDAO: """原始字体hashmap DAO""" + char_map: Dict[str, str] # unicode -> hsah hash_map: Dict[str, str] # hash -> unicode diff --git a/api/decode.py b/api/decode.py index 7fe74639..06993d3c 100644 --- a/api/decode.py +++ b/api/decode.py @@ -5,30 +5,42 @@ from api.logger import logger from api.font_decoder import FontDecoder + def decode_course_list(_text): logger.trace("开始解码课程列表...") _soup = BeautifulSoup(_text, "lxml") _raw_courses = _soup.select("div.course") _course_list = list() for course in _raw_courses: - if not course.select_one("a.not-open-tip") and not course.select_one("div.not-open-tip"): + if not course.select_one("a.not-open-tip") and not course.select_one( + "div.not-open-tip" + ): _course_detail = {} _course_detail["id"] = course.attrs["id"] _course_detail["info"] = course.attrs["info"] _course_detail["roleid"] = course.attrs["roleid"] - _course_detail["clazzId"] = course.select_one("input.clazzId").attrs["value"] - _course_detail["courseId"] = course.select_one("input.courseId").attrs["value"] - _course_detail["cpi"] = re.findall(r"cpi=(.*?)&", course.select_one("a").attrs["href"])[0] - _course_detail["title"] = course.select_one("span.course-name").attrs["title"] + _course_detail["clazzId"] = course.select_one("input.clazzId").attrs[ + "value" + ] + _course_detail["courseId"] = course.select_one("input.courseId").attrs[ + "value" + ] + _course_detail["cpi"] = re.findall( + r"cpi=(.*?)&", course.select_one("a").attrs["href"] + )[0] + _course_detail["title"] = course.select_one("span.course-name").attrs[ + "title" + ] if course.select_one("p.margint10") is None: - _course_detail["desc"] = '' + _course_detail["desc"] = "" else: _course_detail["desc"] = course.select_one("p.margint10").attrs["title"] _course_detail["teacher"] = course.select_one("p.color3").attrs["title"] _course_list.append(_course_detail) return _course_list + def decode_course_folder(_text): logger.trace("开始解码二级课程列表...") _soup = BeautifulSoup(_text, "lxml") @@ -38,39 +50,45 @@ def decode_course_folder(_text): if course.attrs["fileid"]: _course_folder_detail = {} _course_folder_detail["id"] = course.attrs["fileid"] - _course_folder_detail["rename"] = course.select_one("input.rename-input").attrs["value"] + _course_folder_detail["rename"] = course.select_one( + "input.rename-input" + ).attrs["value"] _course_folder_list.append(_course_folder_detail) return _course_folder_list + def decode_course_point(_text): logger.trace("开始解码章节列表...") _soup = BeautifulSoup(_text, "lxml") _course_point = { - "hasLocked": False, # 用于判断该课程任务是否是需要解锁 - "points": [] + "hasLocked": False, # 用于判断该课程任务是否是需要解锁 + "points": [], } - - - for _chapter_unit in _soup.find_all("div",class_="chapter_unit") : + + for _chapter_unit in _soup.find_all("div", class_="chapter_unit"): _point_list = [] _raw_points = _chapter_unit.find_all("li") for _point in _raw_points: _point = _point.div - if (not "id" in _point.attrs): + if not "id" in _point.attrs: continue _point_detail = {} _point_detail["id"] = re.findall(r"^cur(\d{1,20})$", _point.attrs["id"])[0] - _point_detail["title"] = _point.select_one("a.clicktitle").text.replace("\n",'').strip(' ') - _point_detail["jobCount"] = 1 # 默认为1 + _point_detail["title"] = ( + _point.select_one("a.clicktitle").text.replace("\n", "").strip(" ") + ) + _point_detail["jobCount"] = 1 # 默认为1 if _point.select_one("input.knowledgeJobCount"): - _point_detail["jobCount"] = _point.select_one("input.knowledgeJobCount").attrs["value"] + _point_detail["jobCount"] = _point.select_one( + "input.knowledgeJobCount" + ).attrs["value"] else: # 判断是不是因为需要解锁 - if '解锁' in _point.select_one("span.bntHoverTips").text: + if "解锁" in _point.select_one("span.bntHoverTips").text: _course_point["hasLocked"] = True - + _point_list.append(_point_detail) - _course_point["points"]+=_point_list + _course_point["points"] += _point_list return _course_point @@ -79,27 +97,27 @@ def decode_course_card(_text: str): _job_info = {} _job_list = [] # 对于未开放章节检测 - if '章节未开放' in _text: - _job_info['notOpen'] = True - return [],_job_info - + if "章节未开放" in _text: + _job_info["notOpen"] = True + return [], _job_info + _temp = re.findall(r"mArg=\{(.*?)\};", _text.replace(" ", "")) if _temp: _temp = _temp[0] else: - return [],{} + return [], {} _cards = json.loads("{" + _temp + "}") - + if _cards: _job_info = {} _job_info["ktoken"] = _cards["defaults"]["ktoken"] _job_info["mtEnc"] = _cards["defaults"]["mtEnc"] - _job_info["reportTimeInterval"] = _cards["defaults"]["reportTimeInterval"] # 60 + _job_info["reportTimeInterval"] = _cards["defaults"]["reportTimeInterval"] # 60 _job_info["defenc"] = _cards["defaults"]["defenc"] _job_info["cardid"] = _cards["defaults"]["cardid"] _job_info["cpi"] = _cards["defaults"]["cpi"] _job_info["qnenc"] = _cards["defaults"]["qnenc"] - _job_info['knowledgeid'] = _cards["defaults"]["knowledgeid"] + _job_info["knowledgeid"] = _cards["defaults"]["knowledgeid"] _cards = _cards["attachments"] _job_list = [] for _card in _cards: @@ -108,21 +126,21 @@ def decode_course_card(_text: str): continue # 不属于任务点的任务 if "job" not in _card or _card["job"] is False: - if _card.get('type') and _card['type'] == "read": + if _card.get("type") and _card["type"] == "read": # 发现有在视频任务下掺杂阅读任务,不完成可能会导致无法开启下一章节 - if _card['property'].get('read',False): + if _card["property"].get("read", False): # 已阅读,跳过 continue _job = {} - _job['title'] = _card['property']['title'] + _job["title"] = _card["property"]["title"] _job["type"] = "read" - _job['id'] = _card['property']['id'] + _job["id"] = _card["property"]["id"] _job["jobid"] = _card["jobid"] _job["jtoken"] = _card["jtoken"] - _job['mid'] = _card['mid'] - _job['otherinfo'] = _card["otherInfo"] - _job['enc'] = _card["enc"] - _job['aid'] = _card["aid"] + _job["mid"] = _card["mid"] + _job["otherinfo"] = _card["otherInfo"] + _job["enc"] = _card["enc"] + _job["aid"] = _card["aid"] _job_list.append(_job) continue # 视频任务 @@ -165,67 +183,67 @@ def decode_course_card(_text: str): _job["aid"] = _card["aid"] _job_list.append(_job) continue - + if _card["type"] == "vote": # 调查问卷 同上 continue return _job_list, _job_info - + def decode_questions_info(html_content) -> dict: def replace_rtn(text): - return text.replace('\r', '').replace('\t', '').replace('\n', '') + return text.replace("\r", "").replace("\t", "").replace("\n", "") soup = BeautifulSoup(html_content, "lxml") form_data = {} form_tag = soup.find("form") fd = FontDecoder(html_content) # 加载字体 - + # 抽取表单信息 for input_tag in form_tag.find_all("input"): - if 'name' not in input_tag.attrs or 'answer' in input_tag.attrs["name"]: + if "name" not in input_tag.attrs or "answer" in input_tag.attrs["name"]: continue - form_data.update({ - input_tag.attrs["name"]: input_tag.attrs.get("value",'') - }) + form_data.update({input_tag.attrs["name"]: input_tag.attrs.get("value", "")}) - form_data['questions'] = [] - for div_tag in form_tag.find_all("div",class_="singleQuesId"): # 目前来说无论是单选还是多选的题class都是这个 + form_data["questions"] = [] + for div_tag in form_tag.find_all( + "div", class_="singleQuesId" + ): # 目前来说无论是单选还是多选的题class都是这个 q_title = replace_rtn(fd.decode(div_tag.find("div", class_="Zy_TItle").text)) - q_options = '' + q_options = "" for li_tag in div_tag.find("ul").find_all("li"): - q_options += replace_rtn(fd.decode(li_tag.text))+'\n' - q_options=q_options[:-1] # 去除尾部'\n' + q_options += replace_rtn(fd.decode(li_tag.text)) + "\n" + q_options = q_options[:-1] # 去除尾部'\n' # 尝试使用 data 属性来判断题型 - q_type_code = div_tag.find('div',class_='TiMu').attrs['data'] - q_type = '' + q_type_code = div_tag.find("div", class_="TiMu").attrs["data"] + q_type = "" # 此处可能需要完善更多题型的判断 - if q_type_code == '0': - q_type = 'single' - elif q_type_code == '1': - q_type = 'multiple' - elif q_type_code == '2': - q_type = 'completion' - elif q_type_code == '3': - q_type = 'judgement' + if q_type_code == "0": + q_type = "single" + elif q_type_code == "1": + q_type = "multiple" + elif q_type_code == "2": + q_type = "completion" + elif q_type_code == "3": + q_type = "judgement" else: - logger.info("未知题型代码 -> "+q_type_code) - q_type = 'unknown' # 避免出现未定义取值错误 - - form_data["questions"].append({ - 'id': div_tag.attrs["data"], - 'title':q_title, # 题目 - 'options':q_options, # 选项 可提供给题库作为辅助 - 'type': q_type, # 题型 可提供给题库作为辅助 - 'answerField':{ - 'answer'+div_tag.attrs["data"]:'', # 答案填入处 - 'answertype'+div_tag.attrs["data"]:q_type_code + logger.info("未知题型代码 -> " + q_type_code) + q_type = "unknown" # 避免出现未定义取值错误 + + form_data["questions"].append( + { + "id": div_tag.attrs["data"], + "title": q_title, # 题目 + "options": q_options, # 选项 可提供给题库作为辅助 + "type": q_type, # 题型 可提供给题库作为辅助 + "answerField": { + "answer" + div_tag.attrs["data"]: "", # 答案填入处 + "answertype" + div_tag.attrs["data"]: q_type_code, + }, } - }) + ) # 处理答题信息 - form_data['answerwqbid'] = ",".join([q['id'] for q in form_data['questions']])+"," + form_data["answerwqbid"] = ",".join([q["id"] for q in form_data["questions"]]) + "," return form_data - - diff --git a/api/exceptions.py b/api/exceptions.py index 913e71ee..57e00b5c 100644 --- a/api/exceptions.py +++ b/api/exceptions.py @@ -13,7 +13,7 @@ class FormatError(Exception): def __init__(self, *args: object): super().__init__(*args) + class MaxRollBackError(Exception): def __init__(self, *args: object): super().__init__(*args) - \ No newline at end of file diff --git a/api/font_decoder.py b/api/font_decoder.py index 1742c04d..5d0ca4ab 100644 --- a/api/font_decoder.py +++ b/api/font_decoder.py @@ -4,19 +4,20 @@ class FontDecoder: - def __init__(self,html_content:str=None): + def __init__(self, html_content: str = None): self.html_content = html_content # self.__isNeedDecode = True self.__font_hash_map = None self.__decode_init(html_content) - + def __decode_init(self, html_content): if html_content: soup = BeautifulSoup(html_content, "lxml") - style_tag = soup.find("style",id="cxSecretStyle") - match = re.search(r'base64,([\w\W]+?)\'', style_tag.text) - self.__font_hash_map = cxfont.font2map('data:application/font-ttf;charset=utf-8;base64,'+match.group(1)) + style_tag = soup.find("style", id="cxSecretStyle") + match = re.search(r"base64,([\w\W]+?)\'", style_tag.text) + self.__font_hash_map = cxfont.font2map( + "data:application/font-ttf;charset=utf-8;base64," + match.group(1) + ) - def decode(self,target_str:str) -> str: + def decode(self, target_str: str) -> str: return cxfont.decrypt(self.__font_hash_map, target_str) - diff --git a/api/logger.py b/api/logger.py index 2caa5689..ba1e61df 100644 --- a/api/logger.py +++ b/api/logger.py @@ -1,3 +1,3 @@ from loguru import logger -logger.add("chaoxing.log", rotation="10 MB", level="TRACE") \ No newline at end of file +logger.add("chaoxing.log", rotation="10 MB", level="TRACE") diff --git a/api/process.py b/api/process.py index 7e5d9cba..309e5745 100644 --- a/api/process.py +++ b/api/process.py @@ -1,15 +1,16 @@ import time from api.config import GlobalConst as gc + def sec2time(sec: int): h = int(sec / 3600) m = int(sec % 3600 / 60) s = int(sec % 60) if h != 0: - return f'{h}:{m:02}:{s:02}' + return f"{h}:{m:02}:{s:02}" if sec != 0: - return f'{m:02}:{s:02}' - return '--:--' + return f"{m:02}:{s:02}" + return "--:--" def show_progress(name: str, start: int, span: int, total: int, _speed: float): @@ -20,5 +21,9 @@ def show_progress(name: str, start: int, span: int, total: int, _speed: float): length = int(percent * 40 // 100) progress = ("#" * length).ljust(40, " ") # remain = (total - current) - print(f"\r当前任务: {name} |{progress}| {percent}% {sec2time(current)}/{sec2time(total)}", end="", flush=True) - time.sleep(gc.THRESHOLD) \ No newline at end of file + print( + f"\r当前任务: {name} |{progress}| {percent}% {sec2time(current)}/{sec2time(total)}", + end="", + flush=True, + ) + time.sleep(gc.THRESHOLD) diff --git a/app.py b/app.py index 486af05a..ba72eb4c 100644 --- a/app.py +++ b/app.py @@ -15,7 +15,7 @@ def __call__(self, *args: object, **kwargs: object) -> object: return celery_app -if __name__ == '__main__': +if __name__ == "__main__": app = Flask(__name__) app.config.from_mapping( CELERY=dict( @@ -24,4 +24,4 @@ def __call__(self, *args: object, **kwargs: object) -> object: task_ignore_result=True, ), ) - celery_app = celery_init_app(app) \ No newline at end of file + celery_app = celery_init_app(app) diff --git a/main.py b/main.py index d97cbf17..baa69898 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,25 @@ # -*- coding: utf-8 -*- import argparse import configparser +import random + from api.logger import logger from api.base import Chaoxing, Account -from api.exceptions import LoginError, FormatError, JSONDecodeError,MaxRollBackError +from api.exceptions import LoginError, FormatError, JSONDecodeError, MaxRollBackError from api.answer import Tiku -from urllib3 import disable_warnings,exceptions +from urllib3 import disable_warnings, exceptions +import time +import sys import os -# # 定义全局变量,用于存储配置文件路径 +# # 定义全局变量, 用于存储配置文件路径 # textPath = './resource/BookID.txt' # # 获取文本 -> 用于查看学习过的课程ID # def getText(): -# try: +# try: # if not os.path.exists(textPath): -# with open(textPath, 'x') as file: pass +# with open(textPath, 'x') as file: pass # return [] # with open(textPath, 'r', encoding='utf-8') as file: content = file.read().split(',') # content = {int(item.strip()) for item in content if item.strip()} @@ -25,68 +29,108 @@ # # 追加文本 -> 用于记录学习过的课程ID # def appendText(text): # if not os.path.exists(textPath): return -# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') - +# with open(textPath, 'a', encoding='utf-8') as file: file.write(f'{text}, ') + # 关闭警告 disable_warnings(exceptions.InsecureRequestWarning) + def init_config(): - parser = argparse.ArgumentParser(description='Samueli924/chaoxing') # 命令行传参 - parser.add_argument("-c", "--config", type=str, default=None, help="使用配置文件运行程序") + parser = argparse.ArgumentParser( + description="Samueli924/chaoxing", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + + parser.add_argument( + "-c", "--config", type=str, default=None, help="使用配置文件运行程序" + ) parser.add_argument("-u", "--username", type=str, default=None, help="手机号账号") parser.add_argument("-p", "--password", type=str, default=None, help="登录密码") - parser.add_argument("-l", "--list", type=str, default=None, help="要学习的课程ID列表") - parser.add_argument("-s", "--speed", type=float, default=1.0, help="视频播放倍速(默认1,最大2)") + parser.add_argument( + "-l", "--list", type=str, default=None, help="要学习的课程ID列表, 以 , 分隔" + ) + parser.add_argument( + "-s", "--speed", type=float, default=1.0, help="视频播放倍速 (默认1, 最大2)" + ) + parser.add_argument( + "-v", + "--verbose", + "--debug", + action="store_true", + help="启用调试模式, 输出DEBUG级别日志", + ) + + # 在解析之前捕获 -h 的行为 + if len(sys.argv) == 2 and sys.argv[1] in {"-h", "--help"}: + parser.print_help() + # 返回一个 SystemExit 异常, 用于退出程序 + raise SystemExit + + # 提前检查 -h 和 --help 并退出 args = parser.parse_args() + if args.config: config = configparser.ConfigParser() config.read(args.config, encoding="utf8") - return (config.get("common", "username"), - config.get("common", "password"), - str(config.get("common", "course_list")).split(",") if config.get("common", "course_list") else None, - int(config.get("common", "speed")), - config['tiku'] - ) + return ( + config.get("common", "username"), + config.get("common", "password"), + ( + str(config.get("common", "course_list")).split(",") + if config.get("common", "course_list") + else None + ), + int(config.get("common", "speed")), + config["tiku"], + ) else: - return (args.username, args.password, args.list.split(",") if args.list else None, int(args.speed) if args.speed else 1,None) + return ( + args.username, + args.password, + args.list.split(",") if args.list else None, + int(args.speed) if args.speed else 1, + None, + ) + class RollBackManager: def __init__(self) -> None: self.rollback_times = 0 self.rollback_id = "" - def add_times(self,id:str) -> None: + def add_times(self, id: str) -> None: if id == self.rollback_id and self.rollback_times == 3: - raise MaxRollBackError("回滚次数已达3次,请手动检查学习通任务点完成情况") + raise MaxRollBackError("回滚次数已达3次, 请手动检查学习通任务点完成情况") elif id != self.rollback_id: # 新job self.rollback_id = id self.rollback_times = 1 - else: + else: self.rollback_times += 1 -if __name__ == '__main__': +if __name__ == "__main__": try: # 避免异常的无限回滚 RB = RollBackManager() # 初始化登录信息 - username, password, course_list, speed,tiku_config= init_config() + username, password, course_list, speed, tiku_config = init_config() # 规范化播放速度的输入值 speed = min(2.0, max(1.0, speed)) if (not username) or (not password): - username = input("请输入你的手机号,按回车确认\n手机号:") - password = input("请输入你的密码,按回车确认\n密码:") + username = input("请输入你的手机号, 按回车确认\n手机号:") + password = input("请输入你的密码, 按回车确认\n密码:") account = Account(username, password) # 设置题库 tiku = Tiku() - tiku.config_set(tiku_config) # 载入配置 + tiku.config_set(tiku_config) # 载入配置 tiku = tiku.get_tiku_from_config() # 载入题库 - tiku.init_tiku() # 初始化题库 + tiku.init_tiku() # 初始化题库 + # 实例化超星API - chaoxing = Chaoxing(account=account,tiku=tiku) - # 检查当前登录状态,并检查账号密码 + chaoxing = Chaoxing(account=account, tiku=tiku) + # 检查当前登录状态, 并检查账号密码 _login_state = chaoxing.login() if not _login_state["status"]: raise LoginError(_login_state["msg"]) @@ -100,7 +144,9 @@ def add_times(self,id:str) -> None: print(f"ID: {course['courseId']} 课程名: {course['title']}") print("*" * 28) try: - course_list = input("请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n").split(",") + course_list = input( + "请输入想要学习的课程列表,以逗号分隔,例: 2151141,189191,198198\n" + ).split(",") except Exception as e: raise FormatError("输入格式错误") from e # 筛选需要学习的课程 @@ -110,41 +156,50 @@ def add_times(self,id:str) -> None: if not course_task: course_task = all_course # 开始遍历要学习的课程列表 - logger.info(f"课程列表过滤完毕,当前课程任务数量: {len(course_task)}") + logger.info(f"课程列表过滤完毕, 当前课程任务数量: {len(course_task)}") for course in course_task: logger.info(f"开始学习课程: {course['title']}") # 获取当前课程的所有章节 - point_list = chaoxing.get_course_point(course["courseId"], course["clazzId"], course["cpi"]) + point_list = chaoxing.get_course_point( + course["courseId"], course["clazzId"], course["cpi"] + ) - # 为了支持课程任务回滚,采用下标方式遍历任务点 + # 为了支持课程任务回滚, 采用下标方式遍历任务点 __point_index = 0 while __point_index < len(point_list["points"]): point = point_list["points"][__point_index] logger.info(f'当前章节: {point["title"]}') + logger.debug(f"当前章节 __point_index: {__point_index}") # 触发参数: -v + sleep_duration = random.uniform(1, 3) + logger.debug(f"本次随机等待时间: {sleep_duration}") + time.sleep(sleep_duration) # 避免请求过快导致异常, 所以引入随机sleep # 获取当前章节的所有任务点 jobs = [] job_info = None - jobs, job_info = chaoxing.get_job_list(course["clazzId"], course["courseId"], course["cpi"], point["id"]) - + jobs, job_info = chaoxing.get_job_list( + course["clazzId"], course["courseId"], course["cpi"], point["id"] + ) + # bookID = job_info["knowledgeid"] # 获取视频ID - - # 发现未开放章节,尝试回滚上一个任务重新完成一次 + + # 发现未开放章节, 尝试回滚上一个任务重新完成一次 try: - if job_info.get('notOpen',False): + if job_info.get("notOpen", False): __point_index -= 1 # 默认第一个任务总是开放的 # 针对题库启用情况 if not tiku or tiku.DISABLE or not tiku.SUBMIT: - # 未启用题库或未开启题库提交,章节检测未完成会导致无法开始下一章,直接退出 - logger.error(f"章节未开启,可能由于上一章节的章节检测未完成,请手动完成并提交再重试,或者开启题库并启用提交") + # 未启用题库或未开启题库提交, 章节检测未完成会导致无法开始下一章, 直接退出 + logger.error( + f"章节未开启, 可能由于上一章节的章节检测未完成, 请手动完成并提交再重试, 或者开启题库并启用提交" + ) break RB.add_times(point["id"]) continue except MaxRollBackError as e: - logger.error("回滚次数已达3次,请手动检查学习通任务点完成情况") - # 跳过该课程,继续下一课程 + logger.error("回滚次数已达3次, 请手动检查学习通任务点完成情况") + # 跳过该课程, 继续下一课程 break - # 可能存在章节无任何内容的情况 if not jobs: __point_index += 1 @@ -153,42 +208,59 @@ def add_times(self,id:str) -> None: for job in jobs: # 视频任务 if job["type"] == "video": - # TODO: 目前这个记录功能还不够完善,中途退出的课程ID也会被记录 + # TODO: 目前这个记录功能还不够完善, 中途退出的课程ID也会被记录 # TextBookID = getText() # 获取学习过的课程ID - # if TextBookID.count(bookID) > 0: - # logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中,跳过") # 如果已经学习过该课程,则跳过 - # break # 如果已经学习过该课程,则跳过 + # if TextBookID.count(bookID) > 0: + # logger.info(f"课程: {course['title']} 章节: {point['title']} 任务: {job['title']} 已学习过或在学习中, 跳过") # 如果已经学习过该课程, 则跳过 + # break # 如果已经学习过该课程, 则跳过 # appendText(bookID) # 记录正在学习的课程ID - logger.trace(f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + logger.trace( + f"识别到视频任务, 任务章节: {course['title']} 任务ID: {job['jobid']}" + ) # 超星的接口没有返回当前任务是否为Audio音频任务 isAudio = False try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Video") + chaoxing.study_video( + course, job, job_info, _speed=speed, _type="Video" + ) except JSONDecodeError as e: - logger.warning("当前任务非视频任务,正在尝试音频任务解码") + logger.warning("当前任务非视频任务, 正在尝试音频任务解码") isAudio = True if isAudio: try: - chaoxing.study_video(course, job, job_info, _speed=speed, _type="Audio") + chaoxing.study_video( + course, job, job_info, _speed=speed, _type="Audio" + ) except JSONDecodeError as e: - logger.warning(f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过") + logger.warning( + f"出现异常任务 -> 任务章节: {course['title']} 任务ID: {job['jobid']}, 已跳过" + ) # 文档任务 elif job["type"] == "document": - logger.trace(f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}") + logger.trace( + f"识别到文档任务, 任务章节: {course['title']} 任务ID: {job['jobid']}" + ) chaoxing.study_document(course, job) # 测验任务 elif job["type"] == "workid": logger.trace(f"识别到章节检测任务, 任务章节: {course['title']}") - chaoxing.study_work(course, job,job_info) + chaoxing.study_work(course, job, job_info) # 阅读任务 elif job["type"] == "read": logger.trace(f"识别到阅读任务, 任务章节: {course['title']}") - chaoxing.strdy_read(course, job,job_info) + chaoxing.strdy_read(course, job, job_info) __point_index += 1 logger.info("所有课程学习任务已完成") + + except SystemExit as e: + if e.code == 0: # 正常退出 + sys.exit(0) + else: + raise except BaseException as e: import traceback + logger.error(f"错误: {type(e).__name__}: {e}") logger.error(traceback.format_exc()) - raise e \ No newline at end of file + raise e