Skip to content

Commit

Permalink
Update StockBase, make it more usable
Browse files Browse the repository at this point in the history
  • Loading branch information
jimmysitu committed Oct 23, 2024
1 parent 8e4a40e commit 012d1fd
Showing 1 changed file with 135 additions and 79 deletions.
214 changes: 135 additions & 79 deletions msfinance/stocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

from selenium_stealth import stealth
import undetected_chromedriver as uc

# For Chrome driver
Expand All @@ -47,56 +48,16 @@
}

class StockBase:
def __init__(self, debug=False, browser='chrome', database='msfinance.db3', session_factory=None, proxy=None):
def __init__(self, debug=False, browser='chrome', database='msfinance.db3', session_factory=None, proxy=None, driver_type='uc'):
self.debug = debug

# Initialize UserAgent for random user-agent generation
self.ua = UserAgent()

if('chrome' == browser):
# Chrome support
self.options = webdriver.ChromeOptions()

# Set a random user-agent
self.options.add_argument(f"--user-agent={self.ua.random}")

# Setting download directory
self.download_dir = os.path.join(tempfile.gettempdir(), 'msfinance', str(os.getpid()))
if debug:
print(f"Download directory: {self.download_dir}")

if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)

# Use headless mode
if not debug:
self.options.add_argument("--headless")
else:
self.options.add_argument("--start-maximized")
self.options.add_argument("--disable-popup-blocking")

if proxy:
[protocol, host, port] = re.split(r'://|:', proxy)
if 'socks5' == protocol:
self.options.add_argument(f'--proxy-server=socks5://{host}:{port}')
else:
print("No supported proxy protocol")
exit(1)

# Initialize the undetected_chromedriver
self.driver = uc.Chrome(
version_main=126,
use_subprocess=False,
service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()),
options=self.options)

# Change download directory
params = {
"behavior": "allow",
"downloadPath": self.download_dir,
}
self.driver.execute_cdp_cmd("Page.setDownloadBehavior", params)
self.driver_type = driver_type

if browser == 'chrome':
self.setup_chrome_driver(proxy)
else:
# Default: firefox
self.options = webdriver.FirefoxOptions()
Expand Down Expand Up @@ -158,20 +119,55 @@ def __init__(self, debug=False, browser='chrome', database='msfinance.db3', sess
"https": proxy,
}

# Open Morningstar home page
url = f"https://www.morningstar.com"
self.driver.get(url)
# Wait for the page to load, and pass the human verification
time.sleep(30)

# Open Morningstar stock page
url = f"https://www.morningstar.com/stocks"
self.driver.get(url)
time.sleep(10)
time.sleep(20)

def __del__(self):
if not self.debug:
self.driver.quit()

def setup_chrome_driver(self, proxy):
# Chrome support
self.options = webdriver.ChromeOptions()

# Set a random user-agent
self.options.add_argument(f"--user-agent={self.ua.random}")

# Setting download directory
self.download_dir = os.path.join(tempfile.gettempdir(), 'msfinance', str(os.getpid()))
if self.debug:
print(f"Download directory: {self.download_dir}")

if not os.path.exists(self.download_dir):
os.makedirs(self.download_dir)

# Use headless mode
if not self.debug:
self.options.add_argument("--headless")
else:
self.options.add_argument("--start-maximized")
self.options.add_argument("--disable-popup-blocking")

if proxy:
[protocol, host, port] = re.split(r'://|:', proxy)
if 'socks5' == protocol:
self.options.add_argument(f'--proxy-server=socks5://{host}:{port}')
else:
print("No supported proxy protocol")
exit(1)

# Initialize the undetected_chromedriver
self.initialize_driver()

# Change download directory
params = {
"behavior": "allow",
"downloadPath": self.download_dir,
}
self.driver.execute_cdp_cmd("Page.setDownloadBehavior", params)

def _check_database(self, unique_id):
'''
Check database if table with unique_id exists, and return it as a DataFrame
Expand Down Expand Up @@ -219,48 +215,52 @@ def _human_delay(self, min=3, max=15):

def _random_mouse_move(self):
'''Simulate random mouse movement'''
if self.debug:
print("Simulate random mouse movement")
actions = ActionChains(self.driver)
# Get window size
window_size = self.driver.get_window_size()
# Random start position
start_x = random.randint(0, window_size['width'] - 1)
start_y = random.randint(0, window_size['height'] - 1)
actions.move_by_offset(start_x, start_y).perform()
self._human_delay(0.5, 1.5)
# Random end position
end_x = random.randint(0, window_size['width'] - 1)
end_y = random.randint(0, window_size['height'] - 1)
actions.move_by_offset(end_x, end_y).perform()
self._human_delay(0.5, 1.5)


element = self.driver.find_element(By.TAG_NAME, 'body')
target_x = random.randint(100, 200)
target_y = random.randint(100, 200)
if self.debug:
print(f"Target position: {target_x}, {target_y}")
actions.move_to_element_with_offset(element, target_x, target_y).perform()
self._human_delay(1, 5)

def _random_scroll(self):
'''Simulate random page scrolling'''
if self.debug:
print("Simulate random page scrolling")
scroll_height = self.driver.execute_script("return document.body.scrollHeight")
random_position = random.randint(0, scroll_height)
random_position = random.randint(5, scroll_height>>2 + 1)
self.driver.execute_script(f"window.scrollTo(0, {random_position});")
self._human_delay(0.5, 1.5)
self._human_delay(1, 5)

def _random_click(self):
'''Simulate random click on a blank area of the page'''
if self.debug:
print("Simulate random click on a blank area of the page")
window_size = self.driver.get_window_size()
x = random.randint(0, window_size['width'])
y = random.randint(0, window_size['height'])
x = random.randint(0, window_size['width'] - 100)
y = random.randint(0, window_size['height'] - 100)
try:
actions = ActionChains(self.driver)
actions.move_by_offset(x, y).click().perform()
self._human_delay(1, 2)
self._human_delay(1, 5)
# Reset mouse position
actions.move_by_offset(-x, -y).perform()
except Exception:
pass # Ignore exceptions from failed clicks

def _random_typing(self, element, text):
'''Simulate random keyboard typing'''
if self.debug:
print(f"Simulate random keyboard typing: {text}")
for char in text:
element.send_keys(char)
time.sleep(random.uniform(0.05, 0.3)) # Random delay between each character

@retry(wait_fixed=2000, stop_max_attempt_number=3)
@retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=3)
def _get_valuation(self, ticker, exchange, statistics, update=False):

# Compose a unique ID for database table and file name
Expand All @@ -281,7 +281,9 @@ def _get_valuation(self, ticker, exchange, statistics, update=False):
self._human_delay()
self._random_scroll()

statistics_button = self.driver.find_element(By.XPATH, f"//button[contains(., '{statistics}')]")
statistics_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, f"//button[contains(., '{statistics}')]"))
)
statistics_button.click()

# More human-like operations
Expand Down Expand Up @@ -326,7 +328,7 @@ def _get_valuation(self, ticker, exchange, statistics, update=False):

return df

@retry(wait_fixed=2000, stop_max_attempt_number=3)
@retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=3)
def _get_financials(self, ticker, exchange, statement, period='Annual', stage='Restated', update=False):

# Compose a unique ID for database table and file name
Expand All @@ -348,7 +350,9 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R
self._random_scroll()

# Select statement type
type_button = self.driver.find_element(By.XPATH, f"//button[contains(., '{statement}')]")
type_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, f"//button[contains(., '{statement}')]"))
)
type_button.click()

# More human-like operations
Expand All @@ -357,17 +361,23 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R
self._random_scroll()

# Select statement period
period_list_button = self.driver.find_element(By.XPATH, "//button[contains(., 'Annual') and @aria-haspopup='true']")
period_list_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, "//button[contains(., 'Annual') and @aria-haspopup='true']"))
)
try:
period_list_button.click()
self._human_delay()
except ElementClickInterceptedException:
pass

if 'Annual' == period:
period_button = self.driver.find_element(By.XPATH, "//span[contains(., 'Annual') and @class='mds-list-group__item-text__sal']")
period_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Annual') and @class='mds-list-group__item-text__sal']"))
)
else:
period_button = self.driver.find_element(By.XPATH, "//span[contains(., 'Quarterly') and @class='mds-list-group__item-text__sal']")
period_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Quarterly') and @class='mds-list-group__item-text__sal']"))
)

try:
period_button.click()
Expand All @@ -376,7 +386,9 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R
pass

# Select statement stage
stage_list_button = self.driver.find_element(By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']")
stage_list_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']"))
)
try:
stage_list_button.click()
self._human_delay()
Expand All @@ -386,9 +398,13 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R
pass

if 'As Originally Reported' == stage:
stage_button = self.driver.find_element(By.XPATH, "//span[contains(., 'As Originally Reported') and @class='mds-list-group__item-text__sal']")
stage_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'As Originally Reported') and @class='mds-list-group__item-text__sal']"))
)
else:
stage_button = self.driver.find_element(By.XPATH, "//span[contains(., 'Restated') and @class='mds-list-group__item-text__sal']")
stage_button = WebDriverWait(self.driver, 30).until(
EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Restated') and @class='mds-list-group__item-text__sal']"))
)

try:
stage_button.click()
Expand Down Expand Up @@ -462,6 +478,41 @@ def _get_us_exchange_tickers(self, exchange, update=False):
symbols = df['symbol'].tolist()
return symbols

def initialize_driver(self):
# Initialize the driver based on the driver_type
if self.driver_type == 'uc':
self.driver = uc.Chrome(
version_main=126,
use_subprocess=False,
service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()),
options=self.options)
elif self.driver_type == 'stealth':
# Initialize the WebDriver (e.g., Chrome)
self.driver = webdriver.Chrome(
service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()),
options=self.options)

# Apply selenium-stealth to the WebDriver
stealth(self.driver,
languages=["en-US", "en"],
vendor="Google Inc.",
platform="Win32",
webgl_vendor="Intel Inc.",
renderer="Intel Iris OpenGL Engine",
fix_hairline=True,
)
else:
raise ValueError("Invalid driver type specified")

def check_for_bot_confirmation(self):
'''Check if the page contains the string "Let's confirm you aren't a bot"'''
try:
# Use XPath to search for the text in the entire page
self.driver.find_element(By.XPATH, "//*[contains(text(), \"Let's confirm you aren't a bot\")]")
return True
except NoSuchElementException:
return False

# End of class StockBase

class Stock(StockBase):
Expand Down Expand Up @@ -666,3 +717,8 @@ def get_xase_tickers(self):
# End of class Stock







0 comments on commit 012d1fd

Please sign in to comment.