Skip to content

Commit

Permalink
内置人机验证助手(滑块助手)
Browse files Browse the repository at this point in the history
Co-Authored-By: 赵怡然 <[email protected]>
  • Loading branch information
lunzhiPenxil and zhaodice committed Jun 7, 2024
1 parent e9565ae commit 89cf0cb
Show file tree
Hide file tree
Showing 5 changed files with 217 additions and 17 deletions.
33 changes: 33 additions & 0 deletions OlivOS/core/boot/bootAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -890,6 +890,39 @@ def start(self):
)
if model_name not in Proc_Proc_dict:
Proc_Proc_dict[model_name] = Proc_dict[model_name].start_unity('processing')
elif rx_packet_data.action == 'init_type_open_tx_turingTest_webview_page':
if platform.system() == 'Windows':
if type(rx_packet_data.key) is dict \
and 'target' in rx_packet_data.key \
and type(rx_packet_data.key['target']) is dict \
and 'data' in rx_packet_data.key \
and type(rx_packet_data.key['data']) is dict \
and 'action' in rx_packet_data.key['target'] \
and 'name' in rx_packet_data.key['target'] \
and 'title' in rx_packet_data.key['data'] \
and 'url' in rx_packet_data.key['data']:
if 'init' == rx_packet_data.key['target']['action']:
for basic_conf_models_this_key in basic_conf_models:
basic_conf_models_this = basic_conf_models[basic_conf_models_this_key]
if 'tx_turingTest_webview_page' == basic_conf_models_this['type']:
if basic_conf_models_this['name'] not in Proc_dict:
model_name = '%s+%s' % (
basic_conf_models_this['name'],
rx_packet_data.key['target']['name']
)
Proc_dict[model_name] = OlivOS.libEXEModelAPI.txTuringTestPage(
Proc_name=model_name,
scan_interval=basic_conf_models_this['interval'],
dead_interval=basic_conf_models_this['dead_interval'],
rx_queue=None,
tx_queue=None,
control_queue=multiprocessing_dict[basic_conf_models_this['control_queue']],
logger_proc=Proc_dict[basic_conf_models_this['logger_proc']],
title=rx_packet_data.key['data']['title'],
url=rx_packet_data.key['data']['url']
)
if model_name not in Proc_Proc_dict:
Proc_Proc_dict[model_name] = Proc_dict[model_name].start_unity('processing')
elif rx_packet_data.action == 'call_system_event':
if type(rx_packet_data.key) is dict \
and 'action' in rx_packet_data.key \
Expand Down
10 changes: 10 additions & 0 deletions OlivOS/core/boot/bootDataAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,16 @@
"control_queue": "OlivOS_control_queue",
"debug": False
},
"OlivOS_tx_turingTest_webview_page": {
"enable": True,
"name": "OlivOS_tx_turingTest_webview_page",
"type": "tx_turingTest_webview_page",
"interval": 0.2,
"dead_interval": 1,
"logger_proc": "OlivOS_logger",
"control_queue": "OlivOS_control_queue",
"debug": False
},
"OlivOS_update_get": {
"enable": True,
"name": "OlivOS_update_get",
Expand Down
4 changes: 2 additions & 2 deletions OlivOS/core/info/infoAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
import OlivOS


OlivOS_Version = '0.11.33'
OlivOS_SVN = 153
OlivOS_Version = '0.11.34'
OlivOS_SVN = 154

# Compatible <= Plugin[compatible_svn] : Compatible
# OldCompatible <= Plugin[compatible_svn] < Compatible : OldCompatible Warn
Expand Down
140 changes: 140 additions & 0 deletions OlivOS/libBooter/libEXEModelAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import hashlib
import platform
import shutil
import webview

import OlivOS

Expand Down Expand Up @@ -1173,3 +1174,142 @@ def getRandomStringRange(length:int, string:str):
def releaseDir(dir_path):
if not os.path.exists(dir_path):
os.makedirs(dir_path)


def setGoCqhttpTokenSend(
control_queue,
hash,
token
):
if control_queue is not None:
control_queue.put(
OlivOS.API.Control.packet('send', {
'target': {
'type': 'nativeWinUI'
},
'data': {
'action': 'gocqhttp',
'event': 'token_get',
'token': token,
'hash': hash,
}
}
),
block=False
)

def sendOpentxTuringTestPage(
control_queue,
name:str,
title:str,
url:str
):
if control_queue is not None:
control_queue.put(
OlivOS.API.Control.packet(
'init_type_open_tx_turingTest_webview_page',
{
'target': {
'action': 'init',
'name': name
},
'data': {
'title': title,
'url': url
}
}
),
block=False
)

def sendStopTxTuringTestPage(control_queue, Proc_name):
if control_queue is not None:
control_queue.put(
OlivOS.API.Control.packet('stop', Proc_name),
block=False
)

def txTuringTest_evaluate_js(window):
window.evaluate_js(
r"""
mqq.invoke = function(a,b,c){return pywebview.api.invoke(b, JSON.stringify(c))};
"""
)

class txTuringTestApi(object):
def __init__(self, Proc_name, hash, control_queue):
self.Proc_name = Proc_name
self.hash = hash
self.control_queue = control_queue

def invoke(self, b, c):
data = json.loads(c)
if type(data) is dict \
and 'ticket' in data \
and type(data['ticket']) is str:
setGoCqhttpTokenSend(
self.control_queue,
self.hash,
data['ticket']
)
sendStopTxTuringTestPage(
self.control_queue,
self.Proc_name
)

class txTuringTestPage(OlivOS.API.Proc_templet):
def __init__(
self,
Proc_name='tx_turingTest_webview_page',
scan_interval=0.001,
dead_interval=1,
rx_queue=None,
tx_queue=None,
logger_proc=None,
control_queue=None,
title='OlivOS Turing Test Page',
url=None
):
OlivOS.API.Proc_templet.__init__(
self,
Proc_name=Proc_name,
Proc_type='tx_turingTest_webview_page',
scan_interval=scan_interval,
dead_interval=dead_interval,
rx_queue=rx_queue,
tx_queue=tx_queue,
control_queue=control_queue,
logger_proc=logger_proc
)
self.UIObject = {}
self.UIData = {
'title': title,
'url': url,
'control_queue': control_queue
}

def run(self):
releaseDir('./data')
releaseDir('./data/webview')
releaseDir('./data/webview/%s' % self.Proc_name)
if self.UIData['url'] != None:
txTuringTestApi_obj = txTuringTestApi(
self.Proc_name,
self.Proc_name.split('=')[-1],
self.UIData['control_queue'],
)
window = webview.create_window(
title=self.UIData['title'],
url=self.UIData['url'],
background_color='#00A0EA',
js_api=txTuringTestApi_obj
)
webview.start(
txTuringTest_evaluate_js,
window,
private_mode=False,
storage_path='./data/webview/%s' % self.Proc_name,
)

# 发送并等待结束
sendStopTxTuringTestPage(self.Proc_info.control_queue, self.Proc_name)
47 changes: 32 additions & 15 deletions OlivOS/nativeGUI/nativeWinUIAPI.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,14 @@ def mainrun(self):
path=rx_packet_data.key['data']['path']
)
self.UIObject['root_qrcode_window'][hash].start()
elif 'token_get' == rx_packet_data.key['data']['event']:
if 'hash' in rx_packet_data.key['data'] \
and 'token' in rx_packet_data.key['data']:
hash = rx_packet_data.key['data']['hash']
self.setGoCqhttpModelSend(
hash=rx_packet_data.key['data']['hash'],
data=rx_packet_data.key['data']['token']
)
elif 'gocqhttp_terminal_on' == rx_packet_data.key['data']['event']:
if 'hash' in rx_packet_data.key['data']:
self.startGoCqhttpTerminalUI(rx_packet_data.key['data']['hash'])
Expand Down Expand Up @@ -1224,11 +1232,14 @@ def resFunc(event):

def root_Entry_enter(self, name, event):
if name == 'root_input':
input = self.UIData['root_input_StringVar'].get()
if len(input) >= 0 and len(input) < 1000:
self.root.setGoCqhttpModelSend(self.bot.hash, input)
input_data = self.UIData['root_input_StringVar'].get()
if len(input_data) >= 0 and len(input_data) < 1000:
self.root_setGoCqhttpModelSend(input_data)
self.UIData['root_input_StringVar'].set('')

def root_setGoCqhttpModelSend(self, input_data):
self.root.setGoCqhttpModelSend(self.bot.hash, input_data)

def root_Entry_init(self, obj_root, obj_name, str_name, x, y, width_t, width, height, action, title='',
mode='NONE'):
self.UIObject[obj_name + '=Label'] = tkinter.Label(
Expand Down Expand Up @@ -1270,19 +1281,17 @@ def root_Entry_init(self, obj_root, obj_name, str_name, x, y, width_t, width, he
# )

def show_url_webbrowser(self, url):
res = tkinter.messagebox.askquestion("请完成验证", "是否通过浏览器访问 \"" + url + "\" ?")
res = tkinter.messagebox.askquestion("请完成验证", "是否使用内置人机验证助手访问 \"" + url + "\" ?")
try:
if res == 'yes':
res = tkinter.messagebox.askquestion("请完成验证", "是否使用内置浏览器?")
if res == 'yes':
OlivOS.webviewUIAPI.sendOpenWebviewPage(
control_queue=self.root.Proc_info.control_queue,
name='slider_verification_code=%s' % self.bot.hash,
title='请完成验证',
url=url
)
else:
webbrowser.open(url)
OlivOS.libEXEModelAPI.sendOpentxTuringTestPage(
control_queue=self.root.Proc_info.control_queue,
name='slider_verification_code=%s' % self.bot.hash,
title='请完成验证',
url=url
)
else:
webbrowser.open(url)
except webbrowser.Error as error_info:
tkinter.messagebox.showerror("webbrowser.Error", error_info)

Expand Down Expand Up @@ -1315,7 +1324,15 @@ def tree_add_line(self, data, flagInit = False):
if not flagInit and platform.system() == 'Windows':
try:
matchRes = re.match(
r'^\[\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\]\s\[WARNING\]:\s请前往该地址验证\s+->\s+(http[s]{0,1}://captcha\.go-cqhttp\.org/captcha\?[^\s]+).*$',
r'^\[\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\]\s\[WARNING\]:\s请选择提交滑块ticket方式:.*$',
res_data_raw
)
if matchRes != None:
self.tree_add_line('=================================================================')
self.tree_add_line(' 【推荐】 选择手动抓取ticket方式将会允许OlivOS接管验证流程 【推荐】')
self.tree_add_line('=================================================================')
matchRes = re.match(
r'^\[\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2}\]\s\[WARNING\]:\s请前往该地址验证\s+->\s+(http[s]{0,1}://ti\.qq\.com/safe/tools/captcha/sms-verify-login\?[^\s]+).*$',
res_data_raw
)
if matchRes != None:
Expand Down

0 comments on commit 89cf0cb

Please sign in to comment.