From 8d1cde19bb9784f6d824c48fb08ac847f52389ee Mon Sep 17 00:00:00 2001 From: Jimmy Situ Date: Sat, 18 Jan 2025 22:08:37 +0800 Subject: [PATCH] Update stocks for key metrics --- msfinance/stocks.py | 295 ++++++++++++++++++++++++++------------------ 1 file changed, 178 insertions(+), 117 deletions(-) diff --git a/msfinance/stocks.py b/msfinance/stocks.py index 1a020cf..1879437 100644 --- a/msfinance/stocks.py +++ b/msfinance/stocks.py @@ -7,6 +7,7 @@ import tempfile import logging import glob +import multiprocessing import pandas as pd @@ -54,12 +55,7 @@ class StockBase: def __init__(self, debug=False, browser='chrome', database='msfinance.db3', session_factory=None, proxy=None, driver_type='uc'): self.debug = debug - self.logger = logging.getLogger(self.__class__.__name__) - handler = logging.StreamHandler() - formatter = logging.Formatter('%(levelname)s:%(name)s: %(message)s') - handler.setFormatter(formatter) - self.logger.addHandler(handler) - self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO) + self.setup_logger() # Initialize UserAgent for random user-agent generation self.ua = UserAgent() @@ -82,10 +78,10 @@ def __init__(self, debug=False, browser='chrome', database='msfinance.db3', sess self.Session = session_factory else: # Setup SQLAlchemy engine and session - self.engine = create_engine(f'sqlite:///{database}', pool_size=5, max_overflow=10) + self.engine = create_engine( + f'sqlite:///{database}', pool_size=5, max_overflow=10) self.Session = sessionmaker(bind=self.engine) - # Setup proxies for requests self.proxies = { "http": proxy, @@ -103,25 +99,46 @@ def __del__(self): if not self.debug: self.driver.quit() + def setup_logger(self): + # Get the current process name + process_name = multiprocessing.current_process().name + + # Use the process name and id of the instance to create a unique logger name + logger_name = f"{process_name}.{self.__class__.__name__}" + self.logger = logging.getLogger(logger_name) + + # Check if the logger has handlers already to avoid duplicate handlers + if not self.logger.hasHandlers(): + formatter = logging.Formatter( + '%(processName)s:%(levelname)s:%(name)s: %(message)s') + handler = logging.StreamHandler() + handler.setFormatter(formatter) + self.logger.addHandler(handler) + + # Override the logger level + self.logger.setLevel(logging.DEBUG if self.debug else logging.INFO) + def reset_driver(self, retry_state=None): '''Reset the driver''' if retry_state is not None: self.logger.info("Retry State Information:") self.logger.info(f" Attempt number: {retry_state.attempt_number}") - + try: - self.logger.info(f" Last result: {retry_state.outcome.result()}") + self.logger.info( + f" Last result: {retry_state.outcome.result()}") except Exception as e: self.logger.info(f" Last result: {e}") - + try: - self.logger.info(f" Last exception: {retry_state.outcome.exception()}") + self.logger.info( + f" Last exception: {retry_state.outcome.exception()}") except Exception as e: self.logger.info(f" Last exception: {e}") - - self.logger.info(f" Time elapsed: {retry_state.seconds_since_start}") - + + self.logger.info( + f" Time elapsed: {retry_state.seconds_since_start}") # Setup a new driver instance if isinstance(self.driver, (webdriver.Chrome, uc.Chrome)): @@ -131,7 +148,6 @@ def reset_driver(self, retry_state=None): self.driver.quit() self.setup_firefox_driver(self.proxies['http']) - def setup_chrome_driver(self, proxy): # Chrome support self.options = webdriver.ChromeOptions() @@ -140,7 +156,8 @@ def setup_chrome_driver(self, proxy): self.options.add_argument(f"--user-agent={self.ua.random}") # Setting download directory - self.download_dir = os.path.join(tempfile.gettempdir(), 'msfinance', str(os.getpid())) + self.download_dir = os.path.join( + tempfile.gettempdir(), 'msfinance', str(os.getpid())) self.logger.debug(f"Download directory: {self.download_dir}") if not os.path.exists(self.download_dir): @@ -156,7 +173,8 @@ def setup_chrome_driver(self, proxy): if proxy is not None: [protocol, host, port] = re.split(r'://|:', proxy) if 'socks5' == protocol: - self.options.add_argument(f'--proxy-server=socks5://{host}:{port}') + self.options.add_argument( + f'--proxy-server=socks5://{host}:{port}') else: self.logger.error("No supported proxy protocol") exit(1) @@ -164,7 +182,7 @@ def setup_chrome_driver(self, proxy): # Initialize the undetected_chromedriver self.initialize_chrome_driver() - # Override the webdriver property, make more undetected + # Override the webdriver property, make more undetected self.driver.execute_cdp_cmd("Page.addScriptToEvaluateOnNewDocument", { "source": """ Object.defineProperty(navigator, 'webdriver', { @@ -184,7 +202,8 @@ def setup_firefox_driver(self, proxy): self.options = webdriver.FirefoxOptions() # Setting download directory - self.download_dir = os.path.join(tempfile.gettempdir(), 'msfinance', str(os.getpid())) + self.download_dir = os.path.join( + tempfile.gettempdir(), 'msfinance', str(os.getpid())) self.logger.debug(f"Download directory: {self.download_dir}") if not os.path.exists(self.download_dir): @@ -193,16 +212,20 @@ def setup_firefox_driver(self, proxy): self.options.set_preference("browser.download.folderList", 2) self.options.set_preference("browser.download.dir", self.download_dir) self.options.set_preference("browser.download.useDownloadDir", True) - self.options.set_preference("browser.download.viewableInternally.enabledTypes", "") - self.options.set_preference("browser.download.manager.showWhenStarting", False) - self.options.set_preference("browser.helperApps.neverAsk.saveToDisk", "application/octet-stream") + self.options.set_preference( + "browser.download.viewableInternally.enabledTypes", "") + self.options.set_preference( + "browser.download.manager.showWhenStarting", False) + self.options.set_preference( + "browser.helperApps.neverAsk.saveToDisk", "application/octet-stream") # Enable cache self.options.set_preference("browser.cache.disk.enable", True) self.options.set_preference("browser.cache.memory.enable", True) self.options.set_preference("browser.cache.offline.enable", True) self.options.set_preference("network.http.use-cache", True) - self.options.set_preference("general.useragent.override", self.ua.random) + self.options.set_preference( + "general.useragent.override", self.ua.random) # Use headless mode if not self.debug: @@ -214,9 +237,11 @@ def setup_firefox_driver(self, proxy): self.options.set_preference('network.proxy.type', 1) if 'socks5' == protocol: self.options.set_preference('network.proxy.socks', host) - self.options.set_preference('network.proxy.socks_port', int(port)) + self.options.set_preference( + 'network.proxy.socks_port', int(port)) self.options.set_preference('network.proxy.socks_version', 5) - self.options.set_preference('network.proxy.socks_remote_dns', True) + self.options.set_preference( + 'network.proxy.socks_remote_dns', True) else: self.logger.error("No supported proxy protocol") exit(1) @@ -260,7 +285,8 @@ def _update_database(self, unique_id, df): session = self.Session() try: df['Last Updated'] = datetime.now() - df.to_sql(unique_id, session.bind, if_exists='replace', index=False) + df.to_sql(unique_id, session.bind, + if_exists='replace', index=False) return True finally: session.close() @@ -272,21 +298,25 @@ def _human_delay(self, min=3, max=15): def _random_mouse_move(self): '''Simulate random mouse movement''' actions = ActionChains(self.driver) - + element = self.driver.find_element(By.TAG_NAME, 'body') target_x = random.randint(100, 200) target_y = random.randint(100, 200) if self.debug: - self.logger.debug(f"Simulate random mouse movement, target position: {target_x}, {target_y}") - actions.move_to_element_with_offset(element, target_x, target_y).perform() + self.logger.debug( + f"Simulate random mouse movement, target position: {target_x}, {target_y}") + actions.move_to_element_with_offset( + element, target_x, target_y).perform() self._human_delay(1, 5) - + def _random_scroll(self): '''Simulate random page scrolling''' - - scroll_height = self.driver.execute_script("return document.body.scrollHeight") - random_position = random.randint(scroll_height>>1, scroll_height) - self.logger.debug(f"Simulate random page scrolling, target position: {random_position}") + + scroll_height = self.driver.execute_script( + "return document.body.scrollHeight") + random_position = random.randint(scroll_height >> 1, scroll_height) + self.logger.debug( + f"Simulate random page scrolling, target position: {random_position}") self.driver.execute_script(f"window.scrollTo(0, {random_position});") self._human_delay(1, 5) @@ -295,10 +325,11 @@ def _random_typing(self, element, text): self.logger.debug(f"Simulate random keyboard typing: {text}") for char in text: element.send_keys(char) - time.sleep(random.uniform(0.05, 0.3)) # Random delay between each character + # Random delay between each character + time.sleep(random.uniform(0.05, 0.3)) + + def _get_key_metrics(self, ticker, exchange, statistics, stage='Restated', update=False): - def _get_key_metrics(self, ticker, exchange, statistics, update=False): - @retry( wait=wait_random(min=60, max=120), stop=stop_after_attempt(3), @@ -306,7 +337,8 @@ def _get_key_metrics(self, ticker, exchange, statistics, update=False): ) def _get_key_metrics_retry(): # Compose a unique ID for database table and file name - unique_id = f"{ticker}_{exchange}_{statistics}".replace(' ', '_').lower() + unique_id = f"{ticker}_{exchange}_{statistics}_{stage}".replace( + ' ', '_').lower() # Not force to update, check database first if not update: @@ -317,65 +349,105 @@ def _get_key_metrics_retry(): # Fetch data from website starts here url = f"https://www.morningstar.com/stocks/{exchange}/{ticker}/key-metrics" self.driver.get(url) - + # Simulate human-like operations self._random_mouse_move() self._human_delay() self._random_scroll() - + statistics_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, f"//button[contains(., '{statistics}')]")) + EC.visibility_of_element_located( + (By.XPATH, f"//button[contains(., '{statistics}')]")) ) statistics_button.click() - + # More human-like operations self._human_delay() self._random_scroll() + + # Only 'Financial Summary' has stage selection + if 'Financial Summary' == statistics: + # Select metrics stage + stage_list_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located( + (By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']")) + ) + try: + stage_list_button.click() + self._human_delay() + except ElementClickInterceptedException: + pass + except ElementNotInteractableException: + pass + + if 'As Originally Reported' == stage: + stage_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located( + (By.XPATH, "//span[contains(., 'As Originally Reported') and @class='mds-list-group-item__text__sal']")) + ) + else: + stage_button = WebDriverWait(self.driver, 30).until( + EC.visibility_of_element_located( + (By.XPATH, "//span[contains(., 'Restated') and @class='mds-list-group-item__text__sal']")) + ) + try: + stage_button.click() + self._human_delay() + except ElementClickInterceptedException: + pass + except ElementNotInteractableException: + pass + else: + pass + + # Export data export_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, '//*[@id="salKeyStatsPopoverExport"]')) + EC.visibility_of_element_located( + (By.XPATH, '//*[@id="salKeyStatsPopoverExport"]')) ) - + # Check if there is no such data available try: WebDriverWait(self.driver, 5).until( EC.visibility_of_element_located( - (By.XPATH, f"//div[contains(., 'There is no {statistics} data available.')]") + (By.XPATH, + f"//div[contains(., 'There is no {statistics} data available.')]") ) ) return None except TimeoutException: export_button.click() - + # Wait for download to complete tmp_string = statistics_filename[statistics] # Use wildcard to match the file name pattern = os.path.join(self.download_dir, f"{tmp_string}*.xls") - + retries = 10 downloaded_files = glob.glob(pattern) while retries and (not downloaded_files or os.path.getsize(downloaded_files[0]) == 0): time.sleep(1) retries -= 1 downloaded_files = glob.glob(pattern) - + if not downloaded_files: raise ValueError("Export data fail") - + tmp_file = downloaded_files[0] statistics_file = self.download_dir + f"/{unique_id}.xls" os.rename(tmp_file, statistics_file) time.sleep(1) - + # Update database df = pd.read_excel(statistics_file) self._update_database(unique_id, df) - + return df return _get_key_metrics_retry() - + def _get_financials(self, ticker, exchange, statement, period='Annual', stage='Restated', update=False): @retry( @@ -385,62 +457,68 @@ def _get_financials(self, ticker, exchange, statement, period='Annual', stage='R ) def _get_financials_retry(): # Compose a unique ID for database table and file name - unique_id = f"{ticker}_{exchange}_{statement}_{period}_{stage}".replace(' ', '_').lower() - + unique_id = f"{ticker}_{exchange}_{statement}_{period}_{stage}".replace( + ' ', '_').lower() + # Not force to update, check database first if not update: df = self._check_database(unique_id) if df is not None: return df - + # Fetch data from website starts here url = f"https://www.morningstar.com/stocks/{exchange}/{ticker}/financials" self.driver.get(url) - + # Simulate human-like operations self._random_mouse_move() self._human_delay() self._random_scroll() - + # Select statement type type_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, f"//button[contains(., '{statement}')]")) + EC.visibility_of_element_located( + (By.XPATH, f"//button[contains(., '{statement}')]")) ) type_button.click() - + # More human-like operations self._random_scroll() self._human_delay() self._random_mouse_move() - + # Select statement period period_list_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, "//button[contains(., 'Annual') and @aria-haspopup='true']")) + EC.visibility_of_element_located( + (By.XPATH, "//button[contains(., 'Annual') and @aria-haspopup='true']")) ) try: period_list_button.click() self._human_delay() except ElementClickInterceptedException: pass - + if 'Annual' == period: period_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Annual') and @class='mds-list-group-item__text__sal']")) + EC.visibility_of_element_located( + (By.XPATH, "//span[contains(., 'Annual') and @class='mds-list-group-item__text__sal']")) ) else: period_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, "//span[contains(., 'Quarterly') and @class='mds-list-group-item__text__sal']")) + EC.visibility_of_element_located( + (By.XPATH, "//span[contains(., 'Quarterly') and @class='mds-list-group-item__text__sal']")) ) - + try: period_button.click() self._human_delay() except ElementClickInterceptedException: pass - + # Select statement stage stage_list_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']")) + EC.visibility_of_element_located( + (By.XPATH, "//button[contains(., 'As Originally Reported') and @aria-haspopup='true']")) ) try: stage_list_button.click() @@ -449,7 +527,7 @@ def _get_financials_retry(): pass except ElementNotInteractableException: pass - + if 'As Originally Reported' == stage: stage_button = WebDriverWait(self.driver, 30).until( EC.visibility_of_element_located( @@ -460,7 +538,7 @@ def _get_financials_retry(): EC.visibility_of_element_located( (By.XPATH, "//span[contains(., 'Restated') and @class='mds-list-group-item__text__sal']")) ) - + try: stage_button.click() self._human_delay() @@ -468,40 +546,40 @@ def _get_financials_retry(): pass except ElementNotInteractableException: pass - + # More human-like operations self._random_mouse_move() self._human_delay() self._random_scroll() export_button = WebDriverWait(self.driver, 30).until( - EC.visibility_of_element_located((By.XPATH, '//*[@id="salEqsvFinancialsPopoverExport"]')) + EC.visibility_of_element_located( + (By.XPATH, '//*[@id="salEqsvFinancialsPopoverExport"]')) ) export_button.click() - + retries = 5 # Wait for download to complete tmp_file = self.download_dir + f"/{statement}_{period}_{stage}.xls" while retries and (not os.path.exists(tmp_file)): time.sleep(1) retries = retries - 1 - + if 0 == retries and (not os.path.exists(tmp_file)): raise ValueError("Export data fail") - + statement_file = self.download_dir + f"/{unique_id}.xls" os.rename(tmp_file, statement_file) time.sleep(1) - + # Update database df = pd.read_excel(statement_file) self._update_database(unique_id, df) - + return df - + return _get_financials_retry() - - + def _get_us_exchange_tickers(self, exchange, update=False): unique_id = f"us_exchange_{exchange}_tickers" @@ -518,7 +596,7 @@ def _get_us_exchange_tickers(self, exchange, update=False): 'accept': 'application/json, text/plain, */*', 'user-agent': self.ua.random, # Use random user-agent } - url=f'https://api.nasdaq.com/api/screener/stocks?tableonly=true&exchange={exchange}&download=true' + url = f'https://api.nasdaq.com/api/screener/stocks?tableonly=true&exchange={exchange}&download=true' response = requests.get(url, headers=headers) tmp_data = json.loads(response.text) @@ -539,25 +617,27 @@ def initialize_chrome_driver(self): version_main=126, use_subprocess=True, user_multi_procs=True, - service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()), + service=webdriver.ChromeService( + ChromeDriverManager(driver_version='126').install()), debug=self.debug, ) elif self.driver_type == 'stealth': # Initialize the WebDriver (e.g., Chrome) self.driver = webdriver.Chrome( - service=webdriver.ChromeService(ChromeDriverManager(driver_version='126').install()), + service=webdriver.ChromeService( + ChromeDriverManager(driver_version='126').install()), options=self.options, ) - + # Apply selenium-stealth to the WebDriver stealth(self.driver, - languages=["en-US", "en"], - vendor="Google Inc.", - platform="Win32", - webgl_vendor="Intel Inc.", - renderer="Intel Iris OpenGL Engine", - fix_hairline=True, - ) + languages=["en-US", "en"], + vendor="Google Inc.", + platform="Win32", + webgl_vendor="Intel Inc.", + renderer="Intel Iris OpenGL Engine", + fix_hairline=True, + ) else: raise ValueError("Invalid driver type specified") @@ -565,17 +645,20 @@ def check_for_bot_confirmation(self): '''Check if the page contains the string "Let's confirm you aren't a bot"''' try: # Use XPath to search for the text in the entire page - self.driver.find_element(By.XPATH, "//*[contains(text(), \"Let's confirm you aren't a bot\")]") + self.driver.find_element( + By.XPATH, "//*[contains(text(), \"Let's confirm you aren't a bot\")]") return True except NoSuchElementException: return False # End of class StockBase + class Stock(StockBase): ''' Get stock financials statements and key metrics statistics ''' + def get_financial_summary(self, ticker, exchange, update=False): ''' Get financial summary statistics of stock @@ -655,7 +738,7 @@ def get_key_metrics(self, ticker, exchange, update=False): ''' self.key_metrics = [] - for statistics in ['Financial Summary', 'Growth', 'Profitability and Efficiency', 'Financial Health','Cash Flow']: + for statistics in ['Financial Summary', 'Growth', 'Profitability and Efficiency', 'Financial Health', 'Cash Flow']: df = self._get_key_metrics(ticker, exchange, statistics, update) self.key_metrics.append(df) @@ -721,7 +804,8 @@ def get_financials(self, ticker, exchange, period='Annual', stage='As Originally self.financials = [] for statement in ['Income Statement', 'Balance Sheet', 'Cash Flow']: - df = self._get_financials(ticker, exchange, statement, period, stage, update) + df = self._get_financials( + ticker, exchange, statement, period, stage, update) self.financials.append(df) return self.financials @@ -741,7 +825,6 @@ def get_hsi_tickers(self): symbols = [s[pfx_len:].zfill(5) for s in symbols] return symbols - def get_sp500_tickers(self): ''' Get tickers of SP500 @@ -786,25 +869,3 @@ def get_xase_tickers(self): return self._get_us_exchange_tickers(exchange) # End of class Stock - - - - - - - - - - - - - - - - - - - - - -