From f20ed92b89c927afa66e7b0d3964a2bcb307bf5a Mon Sep 17 00:00:00 2001 From: ancoka Date: Mon, 18 Nov 2024 22:03:04 +0800 Subject: [PATCH] =?UTF-8?q?refact(huawei.py):=20=E6=94=AF=E6=8C=81Mate?= =?UTF-8?q?=E7=B3=BB=E5=88=97=E7=AD=89=E6=9C=80=E6=96=B0=E6=9C=BA=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 支持 Mate XT 非凡大师 - 支持 Mate 70 最新机型 --- .gitignore | 1 + README.md | 3 +- browser/browser.py | 1 - browser/browser_factory.py | 11 +- browser/chrome.py | 14 +-- browser/edge.py | 14 +-- browser/firefox.py | 12 +-- browser/safari.py | 12 +-- config.py | 3 + huawei.py | 213 +++++++++++++++++-------------------- huawei_thread.py | 8 +- main.py | 31 ++---- tools/my_logger.py | 24 +++++ tools/time_utils.py | 106 ++++++++++++++++++ tools/utils.py | 22 ++++ utils.py | 82 -------------- 16 files changed, 300 insertions(+), 257 deletions(-) create mode 100644 tools/my_logger.py create mode 100644 tools/time_utils.py create mode 100644 tools/utils.py delete mode 100644 utils.py diff --git a/.gitignore b/.gitignore index d4bb00b..938fa62 100644 --- a/.gitignore +++ b/.gitignore @@ -24,4 +24,5 @@ html **/*.log hw_cookies.txt profiles/* +__pycache__ diff --git a/README.md b/README.md index c8be60b..74e72a7 100644 --- a/README.md +++ b/README.md @@ -40,8 +40,7 @@ - [x] 无窗口模式支持 - [x] 多浏览器支持(chrome、firefox、edge、safari) - [x] 多线程支持 -- [ ] 京东APP抢购(规划中) -- [ ] 下单成功通知功能(待实现) +- [x] 支持 Mate XT 非凡大师、Mate 70 系列等最新机型 ## 运行环境 请安装大于等于python 3.6 的版本及同浏览器版本匹配的浏览器驱动运行此项目 diff --git a/browser/browser.py b/browser/browser.py index 7e64522..68096ef 100644 --- a/browser/browser.py +++ b/browser/browser.py @@ -7,7 +7,6 @@ class Browser(ABC): - @abstractmethod def setting(self, config: Config = None, log_path: str = ""): pass diff --git a/browser/browser_factory.py b/browser/browser_factory.py index 439cb0c..c372c8d 100644 --- a/browser/browser_factory.py +++ b/browser/browser_factory.py @@ -9,13 +9,14 @@ from browser.safari import SafariBrowser from loguru import logger + class BrowserFactory: @staticmethod - def build(browserType): - supportBrowsers = ['chrome', 'firefox', 'edge', 'safari'] - if browserType not in supportBrowsers: - logger.info("不支持的浏览器类型,浏览器类型为:{}", browserType) + def build(browser_type): + support_browsers = ['chrome', 'firefox', 'edge', 'safari'] + if browser_type not in support_browsers: + logger.info("不支持的浏览器类型,浏览器类型为:{}", browser_type) exit() - return eval(browserType.title() + "Browser")() + return eval(browser_type.title() + "Browser")() diff --git a/browser/chrome.py b/browser/chrome.py index bdcedb2..1d45755 100644 --- a/browser/chrome.py +++ b/browser/chrome.py @@ -9,17 +9,17 @@ class ChromeBrowser(Browser): - def setting(self, config: Config = None, log_path: str = "", userDataDir: str = ""): + def setting(self, config: Config = None, log_path: str = "", user_data_dir: str = ""): options = webdriver.ChromeOptions() - options.add_argument(r"--user-data-dir={}".format(userDataDir)) + options.add_argument(r"--user-data-dir={}".format(user_data_dir)) options.add_argument(r"--profile-directory={}".format("Default")) if config.getboolean("browser", "headless", False): options.add_argument('--headless') # headless 模式下需要设置user_agent及窗口大小,否则会被识别成移动端访问 - defaultUserAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" - userAgent = config.get("browser", "userAgent", defaultUserAgent) - options.add_argument(r"user-agent={}".format(userAgent)) + default_user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36" + user_agent = config.get("browser", "userAgent", default_user_agent) + options.add_argument(r"user-agent={}".format(user_agent)) options.add_argument("--window-size=1920,1080") options.add_argument('--ignore-certificate-errors') @@ -32,8 +32,8 @@ def setting(self, config: Config = None, log_path: str = "", userDataDir: str = options.add_argument('--disable-dev-shm-usage') options.add_argument('--no-sandbox') - driverPath = config.get("browser", "driverPath", '') - executable_path = None if len(driverPath) < 1 else driverPath + driver_path = config.get("browser", "driverPath", '') + executable_path = None if len(driver_path) < 1 else driver_path browser = webdriver.Chrome(service=webdriver.ChromeService(executable_path=executable_path, log_path=log_path), options=options) return browser diff --git a/browser/edge.py b/browser/edge.py index 57e5401..32ed7e7 100644 --- a/browser/edge.py +++ b/browser/edge.py @@ -9,17 +9,17 @@ class EdgeBrowser(Browser): - def setting(self, config: Config = None, log_path: str = "", userDataDir: str = ""): + def setting(self, config: Config = None, log_path: str = "", user_data_dir: str = ""): options = webdriver.EdgeOptions() - options.add_argument(r"--user-data-dir={}".format(userDataDir)) + options.add_argument(r"--user-data-dir={}".format(user_data_dir)) options.add_argument(r"--profile-directory={}".format("Default")) if config.getboolean("browser", "headless", False): options.add_argument('--headless') # headless 模式下需要设置user_agent及窗口大小,否则会被识别成移动端访问 - defaultUserAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.55" - userAgent = config.get("browser", "userAgent", defaultUserAgent) - options.add_argument(r"user-agent={}".format(userAgent)) + default_user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/117.0.0.0 Safari/537.36 Edg/117.0.2045.55" + user_agent = config.get("browser", "userAgent", default_user_agent) + options.add_argument(r"user-agent={}".format(user_agent)) options.add_argument("--window-size=1920,1080") options.add_argument('--ignore-certificate-errors') @@ -32,8 +32,8 @@ def setting(self, config: Config = None, log_path: str = "", userDataDir: str = options.add_argument('--disable-dev-shm-usage') options.add_argument('--no-sandbox') - driverPath = config.get("browser", "driverPath", '') - executable_path = None if len(driverPath) < 1 else driverPath + driver_path = config.get("browser", "driverPath", '') + executable_path = None if len(driver_path) < 1 else driver_path browser = webdriver.Edge(service=webdriver.EdgeService(executable_path=executable_path, log_path=log_path), options=options) return browser diff --git a/browser/firefox.py b/browser/firefox.py index 554bc43..3d5bbc9 100644 --- a/browser/firefox.py +++ b/browser/firefox.py @@ -8,20 +8,20 @@ class FirefoxBrowser(Browser): - def setting(self, config: Config = None, log_path: str = "", userDataDir: str = ""): + def setting(self, config: Config = None, log_path: str = "", user_data_dir: str = ""): options = webdriver.FirefoxOptions() if config.getboolean("browser", "headless", False): options.add_argument('--headless') - defaultUserAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/118.0" - userAgent = config.get("browser", "userAgent", defaultUserAgent) - options.set_preference("general.useragent.override", userAgent) + default_user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:109.0) Gecko/20100101 Firefox/118.0" + user_agent = config.get("browser", "userAgent", default_user_agent) + options.set_preference("general.useragent.override", user_agent) # options.add_argument(r"--profile-root={}".format(os.path.dirname(os.path.abspath(__file__)))) # options.add_argument(r"-profile={}".format("profiles")) - driverPath = config.get("browser", "driverPath", '') - executable_path = None if len(driverPath) < 1 else driverPath + driver_path = config.get("browser", "driverPath", '') + executable_path = None if len(driver_path) < 1 else driver_path browser = webdriver.Firefox( service=webdriver.FirefoxService(executable_path=executable_path, log_path=log_path), options=options) diff --git a/browser/safari.py b/browser/safari.py index 80a7ff1..98787c3 100644 --- a/browser/safari.py +++ b/browser/safari.py @@ -8,14 +8,14 @@ class SafariBrowser(Browser): - def setting(self, config: Config = None, log_path: str = "", userDataDir: str = ""): + def setting(self, config: Config = None, log_path: str = "", user_data_dir: str = ""): options = webdriver.SafariOptions() if config.getboolean("browser", "headless", False): options.add_argument('--headless') - defaultUserAgent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15" - userAgent = config.get("browser", "userAgent", defaultUserAgent) - options.add_argument(r"--user-agent={}".format(userAgent)) + default_user_agent = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.0 Safari/605.1.15" + user_agent = config.get("browser", "userAgent", default_user_agent) + options.add_argument(r"--user-agent={}".format(user_agent)) options.add_argument("--window-size=1920,1080") options.add_argument('--ignore-certificate-errors') @@ -27,8 +27,8 @@ def setting(self, config: Config = None, log_path: str = "", userDataDir: str = options.add_argument('--disable-dev-shm-usage') options.add_argument('--no-sandbox') - driverPath = config.get("browser", "driverPath", '') - executable_path = None if len(driverPath) < 1 else driverPath + driver_path = config.get("browser", "driverPath", '') + executable_path = None if len(driver_path) < 1 else driver_path browser = webdriver.Safari(service=webdriver.SafariService(executable_path=executable_path, log_path=log_path, service_args=["--diagnose"]), options=options) diff --git a/config.py b/config.py index ee8cd6a..e0d2e65 100644 --- a/config.py +++ b/config.py @@ -2,14 +2,17 @@ # !/usr/bin/python from configparser import ConfigParser +from loguru import logger class Config: def __init__(self, filename, encoding="utf-8"): + logger.info("开始解析配置文件") self.filename = filename self.encoding = encoding self.config = ConfigParser() self.config.read(filename, encoding) + logger.info("结束解析配置文件") def get(self, section, option, default_value=None): if default_value is None: diff --git a/huawei.py b/huawei.py index 75817b8..978e42e 100644 --- a/huawei.py +++ b/huawei.py @@ -1,37 +1,37 @@ # -*- coding: utf-8 -*- # !/usr/bin/python import json -import locale import os.path -import sys import threading import time from datetime import datetime from urllib.parse import unquote +from loguru import logger from selenium.common import StaleElementReferenceException, NoSuchElementException, TimeoutException, \ ElementClickInterceptedException from selenium.webdriver.common.by import By -from selenium.webdriver.support.wait import WebDriverWait from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.wait import WebDriverWait from browser.browser_factory import BrowserFactory from config import Config -import utils -from loguru import logger - from huawei_thread import HuaWeiThread +from tools import utils, time_utils class HuaWei: - projectPath = os.path.dirname(os.path.abspath(__file__)) - logPath = os.path.join(projectPath, "logs") - seleniumLogFile = os.path.join(logPath, "selenium.log") - baseProfilePath = os.path.join(projectPath, "profiles") - cookiesFile = os.path.join(projectPath, "hw_cookies.txt") - configFile = os.path.join(projectPath, "config.ini") + project_path = os.path.dirname(os.path.abspath(__file__)) + log_path = os.path.join(project_path, "logs") + selenium_log_file = os.path.join(log_path, "selenium.log") + base_profile_path = os.path.join(project_path, "profiles") + cookies_file = os.path.join(project_path, "hw_cookies.txt") + config_file = os.path.join(project_path, "config.ini") config = None browser = None + browser_type = None + driver_wait = None + isLogin = False isWaiting = True isCountdown = True @@ -39,10 +39,8 @@ class HuaWei: isCanSubmitOrder = False isBuyNow = False secKillTime = None - hwServerTimestamp = None - localTimestamp = None - driverWait = None - browserType = None + hw_server_timestamp = None + local_timestamp = None tipMsgs = [ '抱歉,已售完,下次再来', '抱歉,没有抢到', @@ -59,20 +57,23 @@ class HuaWei: '秒杀火爆
该秒杀商品已售罄', ] - def __init__(self, profilePath=None): - logger.info("开始解析配置文件") - self.config = Config(self.configFile) - logger.info("结束解析配置文件") - self.browserType = self.config.get("browser", "type", 'chrome') + def __init__(self, profile_path=None): + self.config = Config(self.config_file) + self.browser_type = self.config.get("browser", "type", 'chrome') self.__pre_browser_setting() - if profilePath is None or profilePath == '': - profilePath = utils.get_profile_path(self.baseProfilePath, self.browserType, 1) - self.__browser_setting(profilePath) - self.__get_local_and_hw_server_time_diff() - self.driverWait = WebDriverWait(self.browser, 5, 0.01) + + if profile_path is None or profile_path == '': + profile_path = utils.get_profile_path(self.base_profile_path, self.browser_type, 1) + + self.__browser_setting(profile_path) + server_timestamp, local_timestamp, ms_diff = time_utils.local_hw_time_diff() + self.hw_server_timestamp = server_timestamp + self.local_timestamp = local_timestamp + self.driver_wait = WebDriverWait(self.browser, 5, 0.01) + utils.set_locale_chinese() def start_process(self): - logger.info("开启抢购华为手机 {0}".format(self.config.get("product", "name"))) + logger.info("开启抢购华为手机") self.__visit_official_website() self.__login() if self.isLogin: @@ -84,7 +85,7 @@ def start_process(self): self.__buy_now() def stop_process(self): - logger.info("结束抢购华为手机 {0}".format(self.config.get("product", "name"))) + logger.info("结束抢购华为手机") time.sleep(120) self.browser.quit() @@ -98,25 +99,24 @@ def thread_process(self): self.__start_buying() def __pre_browser_setting(self): - if not os.path.exists(self.logPath): - os.makedirs(self.logPath) + utils.create_directory(self.log_path) threadCount = max(int(self.config.get("process", "thread", '1')), 1) for i in range(1, threadCount + 1): - profilePath = utils.get_profile_path(self.baseProfilePath, self.browserType, i) - if not os.path.exists(profilePath): - os.makedirs(profilePath) + profile_path = utils.get_profile_path(self.base_profile_path, self.browser_type, i) + utils.create_directory(profile_path) - def __browser_setting(self, profilePath): + def __browser_setting(self, profile_path): logger.info("开始设置浏览器参数") - self.browser = BrowserFactory.build(self.browserType).setting(self.config, self.seleniumLogFile, profilePath) + self.browser = (BrowserFactory.build(self.browser_type) + .setting(self.config, self.selenium_log_file, profile_path)) self.browser.maximize_window() def __visit_official_website(self): logger.info("开始进入华为官网") self.browser.get('https://www.vmall.com/') try: - self.driverWait.until(EC.url_changes) + self.driver_wait.until(EC.url_changes) logger.info("已进入华为官网") self.__get_current_page_type() except TimeoutException: @@ -160,12 +160,12 @@ def __login(self): def __cookies_save(self): cookies = self.browser.get_cookies() - with open(self.cookiesFile, 'w') as f: + with open(self.cookies_file, 'w') as f: f.write(json.dumps(cookies)) f.close() def __load_cookies(self): - with open(self.cookiesFile, 'r') as f: + with open(self.cookies_file, 'r') as f: cookies = json.load(f) for cookie in cookies: self.browser.add_cookie(cookie) @@ -175,8 +175,8 @@ def __goto_login_page(self): loginLink = None times = 1 while loginLink is None and times < 3: - menu_links = self.driverWait.until( - EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.r-1a7l8x0'))) + menu_links = self.driver_wait.until( + EC.presence_of_all_elements_located((By.CSS_SELECTOR, '.css-901oao.r-1a7l8x0.r-1enofrn.r-ueyrd6.r-1pn2ns4.r-gy4na3'))) for menu_link in menu_links: if '请登录' == menu_link.text: loginLink = menu_link @@ -191,7 +191,7 @@ def __goto_login_page(self): loginLink.click() try: - self.driverWait.until(EC.title_contains('华为帐号-登录')) + self.driver_wait.until(EC.title_contains('华为账号-登录')) logger.info("已跳转登录页面") self.__get_current_page_type() except TimeoutException: @@ -201,13 +201,13 @@ def __goto_login_page(self): def __do_login(self): logger.info("开始输入账号及密码") - inputElements = self.driverWait.until(EC.presence_of_all_elements_located((By.CLASS_NAME, "hwid-input"))) + inputElements = self.driver_wait.until(EC.presence_of_all_elements_located((By.CLASS_NAME, "hwid-input"))) inputElements[0].send_keys(self.config.get("user", "name")) inputElements[1].send_keys(self.config.get("user", "password")) logger.info("已输入账号及密码") - self.driverWait.until(EC.presence_of_element_located((By.CLASS_NAME, "hwid-login-btn"))).click() + self.driver_wait.until(EC.presence_of_element_located((By.CLASS_NAME, "hwid-login-btn"))).click() logger.info("发起登录请求") def __check_logged_result(self): @@ -275,7 +275,7 @@ def __check_is_need_verification_code(self): logger.info("检查是否需要获取验证码") isNeedVerificationCode = False try: - isNeedVerificationCode = self.driverWait.until(EC.text_to_be_present_in_element( + isNeedVerificationCode = self.driver_wait.until(EC.text_to_be_present_in_element( (By.CSS_SELECTOR, ".hwid-dialog-main .hwid-getAuthCode .hwid-smsCode"), "获取验证码")) except TimeoutException: @@ -300,7 +300,7 @@ def __check_is_input_verification_code(self): def __click_send_verification_code(self): logger.info("进行短信验证码发送") try: - self.driverWait.until(EC.presence_of_element_located((By.CLASS_NAME, "hwid-smsCode"))).click() + self.driver_wait.until(EC.presence_of_element_located((By.CLASS_NAME, "hwid-smsCode"))).click() logger.success("短信验证码已发送") except TimeoutException: logger.warning("短信验证码已发送超时") @@ -309,7 +309,7 @@ def __check_is_need_trust_browser(self): logger.info("检查是否需要信任浏览器") isNeedTrustBrowser = False try: - isNeedTrustBrowser = self.driverWait.until(EC.text_to_be_present_in_element( + isNeedTrustBrowser = self.driver_wait.until(EC.text_to_be_present_in_element( (By.CSS_SELECTOR, ".hwid-trustBrowser"), "是否信任此浏览器?")) except TimeoutException: pass @@ -322,7 +322,7 @@ def __trust_browser(self): while isNeedTrustBrowser: logger.info("等待信任浏览器中......") try: - buttons = self.driverWait.until(EC.presence_of_all_elements_located( + buttons = self.driver_wait.until(EC.presence_of_all_elements_located( (By.CSS_SELECTOR, '.hwid-trustBrowser .hwid-dialog-textBtnBox .normalBtn'))) for button in buttons: if '信任' == button.text: @@ -334,7 +334,7 @@ def __trust_browser(self): def __current_is_login_page(self): try: - isLoginPage = self.driverWait.until_not(EC.url_contains("id1.cloud.huawei.com/CAS/portal/loginAuth.html")) + isLoginPage = self.driver_wait.until_not(EC.url_contains("id1.cloud.huawei.com/CAS/portal/loginAuth.html")) except TimeoutException: isLoginPage = True pass @@ -355,9 +355,10 @@ def __get_logged_nickname(self): def __visit_product_page(self): currentUrl = self.browser.current_url logger.info("开始进入华为 {0} 产品详情页".format(self.config.get("product", "name"))) - self.browser.get("https://www.vmall.com/product/{0}.html".format(self.config.get("product", "id"))) + self.browser.get( + "https://www.vmall.com/product/comdetail/index.html?prdId={0}".format(self.config.get("product", "id"))) try: - self.driverWait.until(EC.url_changes(currentUrl)) + self.driver_wait.until(EC.url_changes(currentUrl)) logger.info("已进入华为 {0} 产品详情页".format(self.config.get("product", "name"))) self.__get_current_page_type() except TimeoutException: @@ -369,17 +370,17 @@ def __waiting_count(self): times = 1 while self.isWaiting and times <= 3: try: - if EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#pro-operation > a"), "暂不售卖")( + if EC.text_to_be_present_in_element((By.ID, "prd-botnav-rightbtn"), "暂不售卖")( self.browser): logger.info("【{}】倒计时未开始,等待中...", "暂不售卖") time.sleep(120) self.__refresh_product_page() - elif EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#pro-operation > a"), "暂时缺货")( + elif EC.text_to_be_present_in_element((By.ID, "prd-botnav-rightbtn"), "暂时缺货")( self.browser): logger.info("【{}】倒计时未开始,等待中...", "暂时缺货") time.sleep(120) self.__refresh_product_page() - elif EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#pro-operation > a"), "即将开始")( + elif EC.text_to_be_present_in_element((By.ID, "prd-botnav-rightbtn"), "开始")( self.browser): logger.info("倒计时即将开始") self.__get_sec_kill_time() @@ -387,7 +388,7 @@ def __waiting_count(self): self.__set_end_waiting() time.sleep(1) else: - logger.info("当前可立即下单") + logger.info("当前可立即购买") self.__set_end_countdown() self.__set_buy_now() except NoSuchElementException: @@ -409,38 +410,35 @@ def __choose_product(self): def __choose_product_sets(self, sets): logger.info("开始选择手机套装规格") set_skus = sets.split(",") + sku_buttons = self.driver_wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, + ".css-1dbjc4n.r-1h0z5md.r-9aemit .css-1dbjc4n.r-18u37iz.r-1w6e6rj .css-1dbjc4n.r-1loqt21.r-1otgn73"))) for sku in set_skus: - self.driverWait.until(EC.presence_of_element_located((By.LINK_TEXT, f"{sku}"))).click() - sku_payment = '无' - if EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#pro-skus > dl:last-child > label"), "选择销售类型")( - self.browser): - sku_payment = self.config.get("product", "payment", "全款购买") - self.driverWait.until(EC.presence_of_element_located((By.LINK_TEXT, f"{sku_payment}"))).click() - logger.info("选择手机套装规格完成,套装规格:{0} 销售类型:{1}".format(sets, sku_payment)) + for sku_button in sku_buttons: + if sku_button.text == sku: + sku_button.click() + logger.info("选择手机套装规格完成,套装规格:{0}".format(sets)) def __choose_product_item(self): logger.info("开始选择手机单品规格") sku_color = self.config.get("product", "color") sku_version = self.config.get("product", "version") - self.driverWait.until(EC.presence_of_element_located((By.LINK_TEXT, f"{sku_color}"))).click() - self.driverWait.until(EC.presence_of_element_located((By.LINK_TEXT, f"{sku_version}"))).click() - sku_payment = '无' - if EC.text_to_be_present_in_element((By.CSS_SELECTOR, "#pro-skus > dl:last-child > label"), "选择销售类型")( - self.browser): - sku_payment = self.config.get("product", "payment") - self.driverWait.until(EC.presence_of_element_located((By.LINK_TEXT, f"{sku_payment}"))).click() - logger.info("选择手机单品规格完成,颜色:{0} 版本:{1} 销售类型:{1}".format(sku_color, sku_version, sku_payment)) + + sku_buttons = self.driver_wait.until(EC.presence_of_all_elements_located((By.CSS_SELECTOR, + ".css-1dbjc4n.r-1h0z5md.r-9aemit .css-1dbjc4n.r-18u37iz.r-1w6e6rj .css-1dbjc4n.r-1loqt21.r-1otgn73"))) + for sku_button in sku_buttons: + if sku_button.text == sku_color or sku_button.text == sku_version: + sku_button.click() + logger.info("选择手机单品规格完成,颜色:{0} 版本:{1}".format(sku_color, sku_version)) def __countdown(self): while self.isCountdown: - currentUrl = self.browser.current_url - countdownMsDiff = utils.calc_countdown_ms_diff(self.secKillTime, - self.localTimestamp - self.hwServerTimestamp) - countdownTimes = utils.calc_countdown_times(self.secKillTime, - self.localTimestamp - self.hwServerTimestamp) + countdownMsDiff = time_utils.calc_countdown_ms_diff(self.secKillTime, + self.local_timestamp - self.hw_server_timestamp) + countdownTimes = time_utils.calc_countdown_times(self.secKillTime, + self.local_timestamp - self.hw_server_timestamp) if countdownMsDiff > 120000: - logger.info("距离抢购开始还剩:{}", utils.format_countdown_time(countdownTimes)) + logger.info("距离抢购开始还剩:{}", time_utils.format_countdown_time(countdownTimes)) time.sleep(5) else: self.__set_end_countdown() @@ -450,18 +448,18 @@ def __start_buying(self): self.__create_and_start_thread() clickTimes = 1 while self.isStartBuying: - countdownMsDiff = utils.calc_countdown_ms_diff(self.secKillTime, - self.localTimestamp - self.hwServerTimestamp) - countdown_times = utils.calc_countdown_times(self.secKillTime, - self.localTimestamp - self.hwServerTimestamp) + countdownMsDiff = time_utils.calc_countdown_ms_diff(self.secKillTime, + self.local_timestamp - self.hw_server_timestamp) + countdown_times = time_utils.calc_countdown_times(self.secKillTime, + self.local_timestamp - self.hw_server_timestamp) if countdownMsDiff > 1000: - logger.info("距离抢购活动最后下单环节开始还剩:{}", utils.format_countdown_time(countdown_times)) + logger.info("距离抢购活动最后下单环节开始还剩:{}", time_utils.format_countdown_time(countdown_times)) time.sleep(1) elif countdownMsDiff > 100: - logger.info("距离抢购活动最后下单环节开始还剩:{}", utils.format_countdown_time(countdown_times)) + logger.info("距离抢购活动最后下单环节开始还剩:{}", time_utils.format_countdown_time(countdown_times)) time.sleep(0.1) elif countdownMsDiff > 10: - logger.info("距离抢购活动最后下单环节开始还剩:{}", utils.format_countdown_time(countdown_times)) + logger.info("距离抢购活动最后下单环节开始还剩:{}", time_utils.format_countdown_time(countdown_times)) time.sleep(0.01) else: logger.info("抢购活动最后下单环节,进行第 {} 次尝试立即下单", clickTimes) @@ -478,9 +476,8 @@ def __create_and_start_thread(self): if currentThread is threading.main_thread() and threadCount > 1: logger.info("开始创建多线程,需要创建线程数:{}", threadCount) for i in range(2, threadCount + 1): - profilePath = os.path.join(os.path.join(self.baseProfilePath, self.browserType), - 'profile_{0}'.format(i)) - t = HuaWeiThread(i, HuaWei(profilePath)) + profile_path = utils.get_profile_path(self.base_profile_path, self.browser_type, i) + t = HuaWeiThread(i, HuaWei(profile_path)) t.start() else: logger.warning("非主线程或配置线程数不大于1,无需创建多线程") @@ -489,10 +486,10 @@ def __create_and_start_thread(self): def __do_start_buying(self): if not self.isCanSubmitOrder: try: - buttons = self.browser.find_elements(By.CSS_SELECTOR, '#pro-operation > span') + buttons = self.browser.find_elements(By.CSS_SELECTOR, '#prd-botnav-rightbtn') orderBtn = None for button in buttons: - if '立即下单' == button.text: + if '立即购买' == button.text: orderBtn = button if orderBtn is not None: @@ -597,9 +594,9 @@ def __check_can_submit_order(self): logger.info("检查是否可以进行下单操作") self.__check_box_ct_pop_stage() self.__get_current_page_type() - isOrderSubmitPage = EC.url_contains("www.vmall.com/order/nowConfirmcart")(self.browser) + window_size = len(self.browser.window_handles) checkResult = 1 - if not isOrderSubmitPage: + if window_size <= 1: iframeBoxExists = self.__check_iframe_box_pop_exists() if iframeBoxExists: iframe = self.browser.find_element(By.CSS_SELECTOR, '#iframeBox #queueIframe') @@ -649,24 +646,27 @@ def __check_can_submit_order(self): checkResultDict[checkResult]) if checkResult == 1: self.__set_end_start_buying() + new_tab = self.browser.window_handles[-1] + self.browser.switch_to.window(new_tab) def __buy_now(self): if self.isBuyNow: - currentUrl = self.browser.current_url logger.info("开始立即购买") try: - buttons = self.driverWait.until( - EC.presence_of_all_elements_located((By.CSS_SELECTOR, '#pro-operation > a'))) + buttons = self.driver_wait.until( + EC.presence_of_all_elements_located((By.CSS_SELECTOR, '#prd-botnav-rightbtn'))) orderBtn = None for button in buttons: - if '立即下单' == button.text: + if '立即购买' == button.text: orderBtn = button if orderBtn is not None: orderBtn.click() - time.sleep(1) + new_tab = self.browser.window_handles[-1] + self.browser.switch_to.window(new_tab) + currentUrl = self.browser.current_url self.__click_submit_order(currentUrl) except (NoSuchElementException, ElementClickInterceptedException, StaleElementReferenceException) as e: - logger.info("未找到【立即下单】按钮或按钮不可点击; except:{}", e) + logger.info("未找到【立即购买】按钮或按钮不可点击; except:{}", e) logger.info("结束立即购买") def __submit_order(self): @@ -744,7 +744,8 @@ def __get_current_page_type(self): pageType = 'index' elif currentUrl.find('id1.cloud.huawei.com/CAS/portal/loginAuth.html') != -1: pageType = 'login' - elif currentUrl.find("www.vmall.com/product/{0}.html".format(self.config.get("product", "id", ""))) != -1: + elif currentUrl.find("www.vmall.com/product/comdetail/index.html?prdId={0}".format( + self.config.get("product", "id", ""))) != -1: pageType = 'product' elif currentUrl.find("www.vmall.com/order/nowConfirmcart") != -1: pageType = 'order' @@ -761,11 +762,8 @@ def __get_sec_kill_time(self): tryTimes = 1 while self.secKillTime is None and tryTimes < 3: try: - if sys.platform.startswith('win'): - locale.setlocale(locale.LC_ALL, 'en') - locale.setlocale(locale.LC_CTYPE, 'chinese') - countdownStr = self.browser.find_element(By.CSS_SELECTOR, "#pro-operation-countdown > p").text - countdownStr = datetime.now().strftime("%Y年") + countdownStr[:-3] + countdownStr = self.browser.find_element(By.CSS_SELECTOR, "#prd-detail .css-901oao.r-jwli3a.r-1b43r93.r-13uqrnb.r-16dba41.r-oxtfae.r-hjklzo.r-6dt33c") + countdownStr = datetime.now().strftime("%Y年") + countdownStr.text[5:] self.secKillTime = datetime.strptime(countdownStr, "%Y年%m月%d日 %H:%M") logger.info("抢购开始时间为:[{}]", self.secKillTime) except (StaleElementReferenceException, NoSuchElementException): @@ -773,16 +771,3 @@ def __get_sec_kill_time(self): tryTimes += 1 logger.info("获取抢购开始时间结束") - - def __get_local_and_hw_server_time_diff(self): - logger.info("开始获取华为服务器时间及本地时间") - self.hwServerTimestamp = utils.get_hw_server_timestamp() - self.localTimestamp = utils.get_local_timestamp() - - logger.info("当前华为服务器时间为:[{}]", utils.timestamp2time(self.hwServerTimestamp)) - logger.info("当前本地时间为:【{}】", utils.timestamp2time(self.localTimestamp)) - - timeDiff = self.localTimestamp - self.hwServerTimestamp - compareRes = "晚于" if timeDiff >= 0 else "早于" - logger.info("结束获取华为服务器时间及本地时间,结果:本地时间【{}】华为服务器时间【{}】毫秒", compareRes, - abs(timeDiff)) diff --git a/huawei_thread.py b/huawei_thread.py index cd4c33c..fd19ad8 100644 --- a/huawei_thread.py +++ b/huawei_thread.py @@ -6,12 +6,12 @@ class HuaWeiThread(Thread): - def __init__(self, serialNo, huawei): - self.serialNo = serialNo + def __init__(self, serial_no, huawei): + self.serial_no = serial_no self.huawei = huawei - Thread.__init__(self, name="thread_{0}".format(self.serialNo)) + Thread.__init__(self, name="thread_{0}".format(self.serial_no)) def run(self): - logger.info("线程:{} 开始运行,当前活跃线程数为:{}", threading.current_thread().name, threading.activeCount()) + logger.info("线程:{} 开始运行,当前活跃线程数为:{}", threading.current_thread().name, threading.active_count()) self.huawei.thread_process() self.huawei.stop_process() diff --git a/main.py b/main.py index f84d9ac..e9f963e 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,12 @@ # -*- coding: utf-8 -*- # !/usr/bin/python import os -import sys -from datetime import datetime from selenium.common import WebDriverException, NoSuchWindowException from huawei import HuaWei from loguru import logger +from tools.my_logger import setup_logger + def main(): huawei = HuaWei() @@ -14,27 +14,12 @@ def main(): huawei.start_process() huawei.stop_process() except NoSuchWindowException: - logger.info("⃣已关闭浏览器窗口,程序自动退出") + logger.info("已关闭浏览器窗口,程序自动退出") except WebDriverException as we: logger.error("程序执行异常:except: {}", we) - finally: - if os.path.exists(huawei.cookiesFile): - os.remove(huawei.cookiesFile) - - -def log(): - log_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "logs") - log_name = os.path.join(log_path, "log_all_{}.log".format(datetime.now().strftime('%Y%m%d'))) - log_error_name = os.path.join(log_path, "log_error_{}.log".format(datetime.now().strftime('%Y%m%d'))) - logFormat = ("{time:YYYY-MM-DD HH:mm:ss.SSS} | " - "{level: <8} | {name}:{function}:{line} - " - "{process.name}:{thread.name} - {message}") - logger.remove() - logger.add(log_name, format=logFormat, rotation='100 MB', retention='15 days', level="DEBUG", encoding='utf8', - enqueue=True) - logger.add(log_error_name, format=logFormat, rotation='100 MB', retention='15 days', level="ERROR", encoding='utf8', - enqueue=True) - logger.add(sink=sys.stdout, format=logFormat, level="DEBUG") + # finally: + # if os.path.exists(huawei.cookies_file): + # os.remove(huawei.cookies_file) if __name__ == '__main__': @@ -47,10 +32,10 @@ def log(): 888 888 `888'`888' oo .d8P 888 .o 888 .o8 888 `88b. 888 888 888 o888o o888o `8' `8' 8""88888P' `Y8bod8P' `Y8bod8P' o888o o888o o888o o888o o888o """ - log() + setup_logger() logger.info(banner) try: main() except KeyboardInterrupt: - logger.info("正常退出") + logger.info("程序正常退出") exit(0) diff --git a/tools/my_logger.py b/tools/my_logger.py new file mode 100644 index 0000000..93d0531 --- /dev/null +++ b/tools/my_logger.py @@ -0,0 +1,24 @@ +# -*- coding: utf-8 -*- +# !/usr/bin/python +import os +import sys +from datetime import datetime + +from loguru import logger + +log_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "logs") +LOG_FILENAME = os.path.join(log_path, "log_all_{}.log".format(datetime.now().strftime('%Y%m%d'))) +LOG_ERROR_FILENAME = os.path.join(log_path, "log_error_{}.log".format(datetime.now().strftime('%Y%m%d'))) + + +def setup_logger(): + log_format = ("{time:YYYY-MM-DD HH:mm:ss.SSS} | " + "{level: <8} | {name}:{function}:{line} - " + "{process.name}:{thread.name} - {message}") + logger.remove() + logger.add(LOG_FILENAME, format=log_format, rotation='100 MB', retention='15 days', level="DEBUG", encoding='utf8', + enqueue=True) + logger.add(LOG_ERROR_FILENAME, format=log_format, rotation='100 MB', retention='15 days', level="ERROR", + encoding='utf8', + enqueue=True) + logger.add(sink=sys.stdout, format=log_format, level="DEBUG") diff --git a/tools/time_utils.py b/tools/time_utils.py new file mode 100644 index 0000000..ebc8124 --- /dev/null +++ b/tools/time_utils.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# !/usr/bin/python +import math +import time + +import requests +from loguru import logger +from datetime import datetime, timedelta + + +def server_time(): + logger.info("开始获取华为服务器时间") + url = "https://buy.vmall.com/queryRushbuyInfo.json?sbomCodes=2601010453707&portal=1&t=1697127872971" + response = requests.get(url) + if response.ok: + data = response.json() + return data['currentTime'] + else: + logger.error("华为服务器获取时间失败!") + + +def local_time(): + logger.info("开始获取本地机器时间") + local_timestamp = math.floor(time.time() * 1000) + return local_timestamp + + +def local_hw_time_diff(): + start_timestamp = local_time() + server_timestamp = server_time() + end_timestamp = local_time() + + # 使用平均时间来获取更准确的本地时间戳 + local_timestamp = round((start_timestamp + end_timestamp) / 2) + ms_diff = milliseconds_diff(local_timestamp, server_timestamp) + logger.info("当前华为服务器时间为:[{}]", timestamp2time(server_timestamp)) + logger.info("当前本地时间为:【{}】", timestamp2time(local_timestamp)) + + compareRes = "晚于" if ms_diff >= 0 else "早于" + logger.info("结束获取华为服务器时间及本地时间,结果:本地时间【{}】华为服务器时间【{}】毫秒", compareRes, + abs(ms_diff)) + return server_timestamp, local_timestamp, ms_diff + + +def format_countdown_time(countdown_times): + countdown_all_times = [] + countdown_time_units = ["天", "时", "分", "秒", "毫秒"] + for index, countdown_time in enumerate(countdown_times): + countdown_all_times.append(countdown_time) + countdown_all_times.append(countdown_time_units[index]) + return " ".join(countdown_all_times) + + +def get_start_buying_time(countdown_times): + current_date = datetime.strptime(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') + days_delta = timedelta(days=int(countdown_times[0])) + hours_delta = timedelta(hours=int(countdown_times[1])) + minutes_delta = timedelta(minutes=int(countdown_times[2])) + seconds_delta = timedelta(seconds=int(countdown_times[3])) + start_buying_date = current_date + days_delta + hours_delta + minutes_delta + seconds_delta + return start_buying_date + + +def date_second_add(date, seconds): + seconds_delta = timedelta(seconds=seconds) + return date + seconds_delta + + +def milliseconds_diff(local_timestamp, hw_server_timestamp): + ms_diff = local_timestamp - hw_server_timestamp + return ms_diff + + +def seconds_diff(d1, d2): + return (d2 - d1).total_seconds() + + +def timestamp2time(timestamp): + fromDatetime = datetime.fromtimestamp(timestamp / 1000) + return fromDatetime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + +def calc_countdown_ms_diff(target_date_time, ms_diff): + local_timestamp = local_time() - ms_diff + target_timestamp = math.floor(target_date_time.timestamp() * 1000) + return target_timestamp - local_timestamp + + +def calc_countdown_times(target_date_time, ms_diff): + local_timestamp = local_time() - ms_diff + target_timestamp = math.floor(target_date_time.timestamp() * 1000) + origin_timestamp_sec_diff = (target_timestamp - local_timestamp) / 1000 + timestamp_sec_diff = math.floor(origin_timestamp_sec_diff) + + days = max(math.floor(timestamp_sec_diff / 86400), 0) + timestamp_sec_diff = timestamp_sec_diff - days * 86400 + hours = max(math.floor(timestamp_sec_diff / 3600), 0) + timestamp_sec_diff = timestamp_sec_diff - hours * 3600 + minutes = max(math.floor(timestamp_sec_diff / 60), 0) + seconds = max(timestamp_sec_diff - minutes * 60, 0) + microseconds = max( + math.floor((origin_timestamp_sec_diff - days * 86400 - hours * 3600 - minutes * 60 - seconds) * 1000), + 0) + countdown_times = [str(days).zfill(2), str(hours).zfill(2), str(minutes).zfill(2), str(seconds).zfill(2), + str(microseconds).zfill(3)] + return countdown_times diff --git a/tools/utils.py b/tools/utils.py new file mode 100644 index 0000000..a93b33a --- /dev/null +++ b/tools/utils.py @@ -0,0 +1,22 @@ +# -*- coding: utf-8 -*- +# !/usr/bin/python +import locale +import os +import sys + + +def get_profile_path(base_profile_path, browser_type, serial_no=1): + base_browser_profile_path = os.path.join(base_profile_path, browser_type) + profile_path = os.path.join(base_browser_profile_path, "profile_{0}".format(serial_no)) + return profile_path + + +def create_directory(directory_path): + if not os.path.exists(directory_path): + os.makedirs(directory_path) + + +def set_locale_chinese(): + if sys.platform.startswith('win'): + locale.setlocale(locale.LC_ALL, 'en') + locale.setlocale(locale.LC_CTYPE, 'chinese') diff --git a/utils.py b/utils.py deleted file mode 100644 index 90dc0df..0000000 --- a/utils.py +++ /dev/null @@ -1,82 +0,0 @@ -# -*- coding: utf-8 -*- -# !/usr/bin/python -import math -import os -import time -from datetime import datetime, timedelta - -import requests - - -def format_countdown_time(countdown_times): - countdown_all_times = [] - countdown_time_units = ["天", "时", "分", "秒", "毫秒"] - for index, countdown_time in enumerate(countdown_times): - countdown_all_times.append(countdown_time) - countdown_all_times.append(countdown_time_units[index]) - return " ".join(countdown_all_times) - - -def get_start_buying_time(countdown_times): - current_date = datetime.strptime(datetime.now().strftime('%Y-%m-%d %H:%M:%S'), '%Y-%m-%d %H:%M:%S') - days_delta = timedelta(days=int(countdown_times[0])) - hours_delta = timedelta(hours=int(countdown_times[1])) - minutes_delta = timedelta(minutes=int(countdown_times[2])) - seconds_delta = timedelta(seconds=int(countdown_times[3])) - start_buying_date = current_date + days_delta + hours_delta + minutes_delta + seconds_delta - return start_buying_date - - -def date_second_add(date, seconds): - seconds_delta = timedelta(seconds=seconds) - return date + seconds_delta - - -def seconds_diff(d1, d2): - return (d2 - d1).total_seconds() - - -def get_hw_server_timestamp(): - res = requests.get("https://buy.vmall.com/queryRushbuyInfo.json?sbomCodes=2601010453707&portal=1&t=1697127872971") - if res.ok: - data = res.json() - return data['currentTime'] - - -def get_local_timestamp(): - return math.floor(time.time() * 1000) - - -def timestamp2time(timestamp): - fromDatetime = datetime.fromtimestamp(timestamp / 1000) - return fromDatetime.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] - - -def calc_countdown_ms_diff(targetDatetime, msDiff): - localTimestamp = get_local_timestamp() - msDiff - targetTimestamp = math.floor(targetDatetime.timestamp() * 1000) - return targetTimestamp - localTimestamp - - -def calc_countdown_times(targetDatetime, msDiff): - localTimestamp = get_local_timestamp() - msDiff - targetTimestamp = math.floor(targetDatetime.timestamp() * 1000) - originTimestampDiff = (targetTimestamp - localTimestamp) / 1000 - timestampDiff = math.floor(originTimestampDiff) - - days = max(math.floor(timestampDiff / 86400), 0) - timestampDiff = timestampDiff - days * 86400 - hours = max(math.floor(timestampDiff / 3600), 0) - timestampDiff = timestampDiff - hours * 3600 - minutes = max(math.floor(timestampDiff / 60), 0) - seconds = max(timestampDiff - minutes * 60, 0) - microseconds = max(math.floor((originTimestampDiff - days * 86400 - hours * 3600 - minutes * 60 - seconds) * 1000), - 0) - countdown_times = [str(days).zfill(2), str(hours).zfill(2), str(minutes).zfill(2), str(seconds).zfill(2), - str(microseconds).zfill(3)] - return countdown_times - - -def get_profile_path(baseProfilePath, browserType, profileIndex): - baseBrowserProfilePath = os.path.join(baseProfilePath, browserType) - return os.path.join(baseBrowserProfilePath, "profile_{0}".format(profileIndex))