From 012d1fd475241e21150ee967cc0d99426b90b221 Mon Sep 17 00:00:00 2001 From: Jimmy Situ Date: Wed, 23 Oct 2024 23:29:23 +0800 Subject: [PATCH] Update StockBase, make it more usable --- msfinance/stocks.py | 214 ++++++++++++++++++++++++++++---------------- 1 file changed, 135 insertions(+), 79 deletions(-) diff --git a/msfinance/stocks.py b/msfinance/stocks.py index 4760fe0..65b332f 100644 --- a/msfinance/stocks.py +++ b/msfinance/stocks.py @@ -28,6 +28,7 @@ from sqlalchemy import create_engine from sqlalchemy.orm import sessionmaker +from selenium_stealth import stealth import undetected_chromedriver as uc # For Chrome driver @@ -47,56 +48,16 @@ } class StockBase: - def __init__(self, debug=False, browser='chrome', database='msfinance.db3', session_factory=None, proxy=None): + def __init__(self, debug=False, browser='chrome', database='msfinance.db3', session_factory=None, proxy=None, driver_type='uc'): self.debug = debug # Initialize UserAgent for random user-agent generation self.ua = UserAgent() - if('chrome' == browser): - # Chrome support - self.options = webdriver.ChromeOptions() - - # Set a random user-agent - self.options.add_argument(f"--user-agent={self.ua.random}") - - # Setting download directory - self.download_dir = os.path.join(tempfile.gettempdir(), 'msfinance', str(os.getpid())) - if debug: - print(f"Download directory: {self.download_dir}") - - if not os.path.exists(self.download_dir): - os.makedirs(self.download_dir) - - # Use headless mode - if not debug: - self.options.add_argument("--headless") - else: - self.options.add_argument("--start-maximized") - self.options.add_argument("--disable-popup-blocking") - - if proxy: - [protocol, host, port] = re.split(r'://|:', proxy) - if 'socks5' == protocol: - self.options.add_argument(f'--proxy-server=socks5://{host}:{port}') - else: - print("No supported proxy protocol") - exit(1) - - # Initialize the undetected_chromedriver - self.driver = uc.Chrome( - version_main=126, - use_subprocess=False, - service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()), - options=self.options) - - # Change download directory - params = { - "behavior": "allow", - "downloadPath": self.download_dir, - } - self.driver.execute_cdp_cmd("Page.setDownloadBehavior", params) + self.driver_type = driver_type + if browser == 'chrome': + self.setup_chrome_driver(proxy) else: # Default: firefox self.options = webdriver.FirefoxOptions() @@ -158,20 +119,55 @@ def __init__(self, debug=False, browser='chrome', database='msfinance.db3', sess "https": proxy, } - # Open Morningstar home page - url = f"https://www.morningstar.com" - self.driver.get(url) - # Wait for the page to load, and pass the human verification - time.sleep(30) - + # Open Morningstar stock page url = f"https://www.morningstar.com/stocks" self.driver.get(url) - time.sleep(10) + time.sleep(20) def __del__(self): if not self.debug: self.driver.quit() + def setup_chrome_driver(self, proxy): + # Chrome support + self.options = webdriver.ChromeOptions() + + # Set a random user-agent + self.options.add_argument(f"--user-agent={self.ua.random}") + + # Setting download directory + self.download_dir = os.path.join(tempfile.gettempdir(), 'msfinance', str(os.getpid())) + if self.debug: + print(f"Download directory: {self.download_dir}") + + if not os.path.exists(self.download_dir): + os.makedirs(self.download_dir) + + # Use headless mode + if not self.debug: + self.options.add_argument("--headless") + else: + self.options.add_argument("--start-maximized") + self.options.add_argument("--disable-popup-blocking") + + if proxy: + [protocol, host, port] = re.split(r'://|:', proxy) + if 'socks5' == protocol: + self.options.add_argument(f'--proxy-server=socks5://{host}:{port}') + else: + print("No supported proxy protocol") + exit(1) + + # Initialize the undetected_chromedriver + self.initialize_driver() + + # Change download directory + params = { + "behavior": "allow", + "downloadPath": self.download_dir, + } + self.driver.execute_cdp_cmd("Page.setDownloadBehavior", params) + def _check_database(self, unique_id): ''' Check database if table with unique_id exists, and return it as a DataFrame @@ -219,36 +215,38 @@ def _human_delay(self, min=3, max=15): def _random_mouse_move(self): '''Simulate random mouse movement''' + if self.debug: + print("Simulate random mouse movement") actions = ActionChains(self.driver) - # Get window size - window_size = self.driver.get_window_size() - # Random start position - start_x = random.randint(0, window_size['width'] - 1) - start_y = random.randint(0, window_size['height'] - 1) - actions.move_by_offset(start_x, start_y).perform() - self._human_delay(0.5, 1.5) - # Random end position - end_x = random.randint(0, window_size['width'] - 1) - end_y = random.randint(0, window_size['height'] - 1) - actions.move_by_offset(end_x, end_y).perform() - self._human_delay(0.5, 1.5) - + + element = self.driver.find_element(By.TAG_NAME, 'body') + target_x = random.randint(100, 200) + target_y = random.randint(100, 200) + if self.debug: + print(f"Target position: {target_x}, {target_y}") + actions.move_to_element_with_offset(element, target_x, target_y).perform() + self._human_delay(1, 5) + def _random_scroll(self): '''Simulate random page scrolling''' + if self.debug: + print("Simulate random page scrolling") scroll_height = self.driver.execute_script("return document.body.scrollHeight") - random_position = random.randint(0, scroll_height) + random_position = random.randint(5, scroll_height>>2 + 1) self.driver.execute_script(f"window.scrollTo(0, {random_position});") - self._human_delay(0.5, 1.5) + self._human_delay(1, 5) def _random_click(self): '''Simulate random click on a blank area of the page''' + if self.debug: + print("Simulate random click on a blank area of the page") window_size = self.driver.get_window_size() - x = random.randint(0, window_size['width']) - y = random.randint(0, window_size['height']) + x = random.randint(0, window_size['width'] - 100) + y = random.randint(0, window_size['height'] - 100) try: actions = ActionChains(self.driver) actions.move_by_offset(x, y).click().perform() - self._human_delay(1, 2) + self._human_delay(1, 5) # Reset mouse position actions.move_by_offset(-x, -y).perform() except Exception: @@ -256,11 +254,13 @@ def _random_click(self): def _random_typing(self, element, text): '''Simulate random keyboard typing''' + if self.debug: + print(f"Simulate random keyboard typing: {text}") for char in text: element.send_keys(char) time.sleep(random.uniform(0.05, 0.3)) # Random delay between each character - @retry(wait_fixed=2000, stop_max_attempt_number=3) + @retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=3) def _get_valuation(self, ticker, exchange, statistics, update=False): # Compose a unique ID for database table and file name @@ -281,7 +281,9 @@ def _get_valuation(self, ticker, exchange, statistics, update=False): self._human_delay() self._random_scroll() - statistics_button = self.driver.find_element(By.XPATH, f"//button[contains(., '{statistics}')]") + statistics_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, f"//button[contains(., '{statistics}')]")) + ) statistics_button.click() # More human-like operations @@ -326,7 +328,7 @@ def _get_valuation(self, ticker, exchange, statistics, update=False): return df - @retry(wait_fixed=2000, stop_max_attempt_number=3) + @retry(wait_random_min=1000, wait_random_max=5000, stop_max_attempt_number=3) def _get_financials(self, ticker, exchange, statement, period='Annual', stage='Restated', update=False): # Compose a unique ID for database table and file name @@ -348,7 +350,9 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R self._random_scroll() # Select statement type - type_button = self.driver.find_element(By.XPATH, f"//button[contains(., '{statement}')]") + type_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, f"//button[contains(., '{statement}')]")) + ) type_button.click() # More human-like operations @@ -357,7 +361,9 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R self._random_scroll() # Select statement period - period_list_button = self.driver.find_element(By.XPATH, "//button[contains(., 'Annual') and @aria-haspopup='true']") + period_list_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, "//button[contains(., 'Annual') and @aria-haspopup='true']")) + ) try: period_list_button.click() self._human_delay() @@ -365,9 +371,13 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R pass if 'Annual' == period: - period_button = self.driver.find_element(By.XPATH, "//span[contains(., 'Annual') and @class='mds-list-group__item-text__sal']") + period_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Annual') and @class='mds-list-group__item-text__sal']")) + ) else: - period_button = self.driver.find_element(By.XPATH, "//span[contains(., 'Quarterly') and @class='mds-list-group__item-text__sal']") + period_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Quarterly') and @class='mds-list-group__item-text__sal']")) + ) try: period_button.click() @@ -376,7 +386,9 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R pass # Select statement stage - stage_list_button = self.driver.find_element(By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']") + stage_list_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']")) + ) try: stage_list_button.click() self._human_delay() @@ -386,9 +398,13 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R pass if 'As Originally Reported' == stage: - stage_button = self.driver.find_element(By.XPATH, "//span[contains(., 'As Originally Reported') and @class='mds-list-group__item-text__sal']") + stage_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'As Originally Reported') and @class='mds-list-group__item-text__sal']")) + ) else: - stage_button = self.driver.find_element(By.XPATH, "//span[contains(., 'Restated') and @class='mds-list-group__item-text__sal']") + stage_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Restated') and @class='mds-list-group__item-text__sal']")) + ) try: stage_button.click() @@ -462,6 +478,41 @@ def _get_us_exchange_tickers(self, exchange, update=False): symbols = df['symbol'].tolist() return symbols + def initialize_driver(self): + # Initialize the driver based on the driver_type + if self.driver_type == 'uc': + self.driver = uc.Chrome( + version_main=126, + use_subprocess=False, + service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()), + options=self.options) + elif self.driver_type == 'stealth': + # Initialize the WebDriver (e.g., Chrome) + self.driver = webdriver.Chrome( + service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()), + options=self.options) + + # Apply selenium-stealth to the WebDriver + stealth(self.driver, + languages=["en-US", "en"], + vendor="Google Inc.", + platform="Win32", + webgl_vendor="Intel Inc.", + renderer="Intel Iris OpenGL Engine", + fix_hairline=True, + ) + else: + raise ValueError("Invalid driver type specified") + + def check_for_bot_confirmation(self): + '''Check if the page contains the string "Let's confirm you aren't a bot"''' + try: + # Use XPath to search for the text in the entire page + self.driver.find_element(By.XPATH, "//*[contains(text(), \"Let's confirm you aren't a bot\")]") + return True + except NoSuchElementException: + return False + # End of class StockBase class Stock(StockBase): @@ -666,3 +717,8 @@ def get_xase_tickers(self): # End of class Stock + + + + +