diff --git a/linkedin_scraper/company.py b/linkedin_scraper/company.py index f731272..1abc95d 100644 --- a/linkedin_scraper/company.py +++ b/linkedin_scraper/company.py @@ -9,6 +9,7 @@ from .person import Person import time import os +from urllib.parse import quote_plus import json AD_BANNER_CLASSNAME = ('ad-banner-container', '__ad') @@ -49,7 +50,7 @@ class Company(Scraper): employees = [] headcount = None - def __init__(self, linkedin_url = None, name = None, about_us =None, website = None, phone = None, headquarters = None, founded = None, industry = None, company_type = None, company_size = None, specialties = None, showcase_pages =[], affiliated_companies = [], driver = None, scrape = True, get_employees = True, close_on_complete = True): + def __init__(self, linkedin_url = None, name = None, about_us =None, website = None, phone = None, headquarters = None, founded = None, industry = None, company_type = None, company_size = None, specialties = None, showcase_pages =[], affiliated_companies = [], driver = None, scrape = True, get_employees = True, close_on_complete = True, employee_search_keywords = None, timeout = 10): self.linkedin_url = linkedin_url self.name = name self.about_us = about_us @@ -63,6 +64,12 @@ def __init__(self, linkedin_url = None, name = None, about_us =None, website = N self.specialties = specialties self.showcase_pages = showcase_pages self.affiliated_companies = affiliated_companies + self.employee_search_keywords = employee_search_keywords + self.timeout = timeout + + # Validation: Check if keywords provided but get_employees is False + if employee_search_keywords and not get_employees: + raise ValueError("Cannot use employee_search_keywords when get_employees=False. Set get_employees=True to filter employees by keywords.") if driver is None: try: @@ -118,9 +125,10 @@ def __parse_employee__(self, employee_raw): # print(e) return None - def get_employees(self, wait_time=10): + + def get_employees(self, keywords=None): total = [] - list_css = "list-style-none" + employee_xpath = '//div[contains(@class, "artdeco-entity-lockup")]' next_xpath = '//button[@aria-label="Next"]' driver = self.driver @@ -128,43 +136,56 @@ def get_employees(self, wait_time=10): see_all_employees = driver.find_element(By.XPATH,'//a[@data-control-name="topcard_see_all_employees"]') except: pass - driver.get(os.path.join(self.linkedin_url, "people")) + + # Construct URL with keyword search if provided + people_url = f"{self.linkedin_url}/people" + if keywords: + # Join keywords and URL encode them + keyword_string = " ".join(keywords) if isinstance(keywords, list) else str(keywords) + encoded_keywords = quote_plus(keyword_string) + people_url = f"{people_url}/?keywords={encoded_keywords}" + + driver.get(people_url) - _ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.XPATH, '//span[@dir="ltr"]'))) + _ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.XPATH, employee_xpath))) driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));") time.sleep(1) driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight*3/4));") time.sleep(1) - results_list = driver.find_element(By.CLASS_NAME, list_css) - results_li = results_list.find_elements(By.TAG_NAME, "li") - for res in results_li: - total.append(self.__parse_employee__(res)) + # Get employee elements directly + employee_elements = driver.find_elements(By.XPATH, employee_xpath) + for res in employee_elements: + employee = self.__parse_employee__(res) + if employee: + total.append(employee) def is_loaded(previous_results): loop = 0 driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));") - results_li = results_list.find_elements(By.TAG_NAME, "li") - while len(results_li) == previous_results and loop <= 5: + employee_elements = driver.find_elements(By.XPATH, employee_xpath) + while len(employee_elements) == previous_results and loop <= 5: time.sleep(1) driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));") - results_li = results_list.find_elements(By.TAG_NAME, "li") + employee_elements = driver.find_elements(By.XPATH, employee_xpath) loop += 1 return loop <= 5 def get_data(previous_results): - results_li = results_list.find_elements(By.TAG_NAME, "li") - for res in results_li[previous_results:]: - total.append(self.__parse_employee__(res)) - - results_li_len = len(results_li) - while is_loaded(results_li_len): + employee_elements = driver.find_elements(By.XPATH, employee_xpath) + for res in employee_elements[previous_results:]: + employee = self.__parse_employee__(res) + if employee: + total.append(employee) + + employee_count = len(employee_elements) + while is_loaded(employee_count): try: driver.find_element(By.XPATH,next_xpath).click() except: pass - _ = WebDriverWait(driver, wait_time).until(EC.presence_of_element_located((By.CLASS_NAME, list_css))) + _ = WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.XPATH, employee_xpath))) driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));") time.sleep(1) @@ -175,8 +196,8 @@ def get_data(previous_results): driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight));") time.sleep(1) - get_data(results_li_len) - results_li_len = len(total) + get_data(employee_count) + employee_count = len(total) return total @@ -186,7 +207,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True): driver.get(self.linkedin_url) - _ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.XPATH, '//div[@dir="ltr"]'))) + _ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.XPATH, '//div[@dir="ltr"]'))) navigation = driver.find_element(By.CLASS_NAME, "org-page-navigation__items ") @@ -201,7 +222,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True): except: driver.get(os.path.join(self.linkedin_url, "about")) - _ = WebDriverWait(driver, 3).until(EC.presence_of_all_elements_located((By.TAG_NAME, 'section'))) + _ = WebDriverWait(driver, self.timeout).until(EC.presence_of_all_elements_located((By.TAG_NAME, 'section'))) time.sleep(3) if 'Cookie Policy' in driver.find_elements(By.TAG_NAME, "section")[1].text or any(classname in driver.find_elements(By.TAG_NAME, "section")[1].get_attribute('class') for classname in AD_BANNER_CLASSNAME): @@ -255,9 +276,8 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True): driver.execute_script("window.scrollTo(0, Math.ceil(document.body.scrollHeight/2));") - try: - _ = WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.CLASS_NAME, 'company-list'))) + _ = WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.CLASS_NAME, 'company-list'))) showcase, affiliated = driver.find_elements(By.CLASS_NAME, "company-list") driver.find_element(By.ID,"org-related-companies-module__show-more-btn").click() @@ -284,7 +304,7 @@ def scrape_logged_in(self, get_employees = True, close_on_complete = True): pass if get_employees: - self.employees = self.get_employees() + self.employees = self.get_employees(keywords=self.employee_search_keywords) driver.get(self.linkedin_url) @@ -313,7 +333,7 @@ def scrape_not_logged_in(self, close_on_complete = True, retry_limit = 10, get_e # get showcase try: driver.find_element(By.ID,"view-other-showcase-pages-dialog").click() - WebDriverWait(driver, 3).until(EC.presence_of_element_located((By.ID, 'dialog'))) + WebDriverWait(driver, self.timeout).until(EC.presence_of_element_located((By.ID, 'dialog'))) showcase_pages = driver.find_elements(By.CLASS_NAME, "company-showcase-pages")[1] for showcase_company in showcase_pages.find_elements(By.TAG_NAME, "li"): @@ -344,7 +364,7 @@ def scrape_not_logged_in(self, close_on_complete = True, retry_limit = 10, get_e pass if get_employees: - self.employees = self.get_employees() + self.employees = self.get_employees(keywords=self.employee_search_keywords) driver.get(self.linkedin_url) diff --git a/linkedin_scraper/person.py b/linkedin_scraper/person.py index fb077f1..3ab7194 100644 --- a/linkedin_scraper/person.py +++ b/linkedin_scraper/person.py @@ -116,17 +116,31 @@ def get_experiences(self): main_list = self.wait_for_element_to_load(name="pvs-list__container", base=main) for position in main_list.find_elements(By.CLASS_NAME, "pvs-list__paged-list-item"): position = position.find_element(By.CSS_SELECTOR, "div[data-view-name='profile-component-entity']") - company_logo_elem, position_details = position.find_elements(By.XPATH, "*") + + # Fix: Handle case where more than 2 elements are returned + elements = position.find_elements(By.XPATH, "*") + if len(elements) < 2: + continue # Skip if we don't have enough elements + + company_logo_elem = elements[0] + position_details = elements[1] # company elem - company_linkedin_url = company_logo_elem.find_element(By.XPATH,"*").get_attribute("href") - if not company_linkedin_url: + try: + company_linkedin_url = company_logo_elem.find_element(By.XPATH,"*").get_attribute("href") + if not company_linkedin_url: + continue + except NoSuchElementException: continue # position details position_details_list = position_details.find_elements(By.XPATH,"*") position_summary_details = position_details_list[0] if len(position_details_list) > 0 else None position_summary_text = position_details_list[1] if len(position_details_list) > 1 else None + + if not position_summary_details: + continue + outer_positions = position_summary_details.find_element(By.XPATH,"*").find_elements(By.XPATH,"*") if len(outer_positions) == 4: @@ -147,50 +161,71 @@ def get_experiences(self): location = outer_positions[2].find_element(By.TAG_NAME,"span").text else: position_title = "" - company = outer_positions[0].find_element(By.TAG_NAME,"span").text - work_times = "" + company = outer_positions[0].find_element(By.TAG_NAME,"span").text if outer_positions else "" + work_times = outer_positions[1].find_element(By.TAG_NAME,"span").text if len(outer_positions) > 1 else "" location = "" - - times = work_times.split("·")[0].strip() if work_times else "" - duration = work_times.split("·")[1].strip() if len(work_times.split("·")) > 1 else None + # Safely extract times and duration + if work_times: + parts = work_times.split("·") + times = parts[0].strip() if parts else "" + duration = parts[1].strip() if len(parts) > 1 else None + else: + times = "" + duration = None from_date = " ".join(times.split(" ")[:2]) if times else "" - to_date = " ".join(times.split(" ")[3:]) if times else "" - if position_summary_text and any(element.get_attribute("pvs-list__container") for element in position_summary_text.find_elements(By.TAG_NAME, "*")): - inner_positions = (position_summary_text.find_element(By.CLASS_NAME,"pvs-list__container") - .find_element(By.XPATH,"*").find_element(By.XPATH,"*").find_element(By.XPATH,"*") - .find_elements(By.CLASS_NAME,"pvs-list__paged-list-item")) + to_date = " ".join(times.split(" ")[3:]) if times and len(times.split(" ")) > 3 else "" + + if position_summary_text and any(element.get_attribute("class") == "pvs-list__container" for element in position_summary_text.find_elements(By.XPATH, "*")): + try: + inner_positions = (position_summary_text.find_element(By.CLASS_NAME,"pvs-list__container") + .find_element(By.XPATH,"*").find_element(By.XPATH,"*").find_element(By.XPATH,"*") + .find_elements(By.CLASS_NAME,"pvs-list__paged-list-item")) + except NoSuchElementException: + inner_positions = [] else: inner_positions = [] + if len(inner_positions) > 1: descriptions = inner_positions for description in descriptions: - res = description.find_element(By.TAG_NAME,"a").find_elements(By.XPATH,"*") - position_title_elem = res[0] if len(res) > 0 else None - work_times_elem = res[1] if len(res) > 1 else None - location_elem = res[2] if len(res) > 2 else None - - - location = location_elem.find_element(By.XPATH,"*").text if location_elem else None - position_title = position_title_elem.find_element(By.XPATH,"*").find_element(By.TAG_NAME,"*").text if position_title_elem else "" - work_times = work_times_elem.find_element(By.XPATH,"*").text if work_times_elem else "" - times = work_times.split("·")[0].strip() if work_times else "" - duration = work_times.split("·")[1].strip() if len(work_times.split("·")) > 1 else None - from_date = " ".join(times.split(" ")[:2]) if times else "" - to_date = " ".join(times.split(" ")[3:]) if times else "" - - experience = Experience( - position_title=position_title, - from_date=from_date, - to_date=to_date, - duration=duration, - location=location, - description=description, - institution_name=company, - linkedin_url=company_linkedin_url - ) - self.add_experience(experience) + try: + res = description.find_element(By.TAG_NAME,"a").find_elements(By.XPATH,"*") + position_title_elem = res[0] if len(res) > 0 else None + work_times_elem = res[1] if len(res) > 1 else None + location_elem = res[2] if len(res) > 2 else None + + location = location_elem.find_element(By.XPATH,"*").text if location_elem else None + position_title = position_title_elem.find_element(By.XPATH,"*").find_element(By.TAG_NAME,"*").text if position_title_elem else "" + work_times = work_times_elem.find_element(By.XPATH,"*").text if work_times_elem else "" + + # Safely extract times and duration + if work_times: + parts = work_times.split("·") + times = parts[0].strip() if parts else "" + duration = parts[1].strip() if len(parts) > 1 else None + else: + times = "" + duration = None + + from_date = " ".join(times.split(" ")[:2]) if times else "" + to_date = " ".join(times.split(" ")[3:]) if times and len(times.split(" ")) > 3 else "" + + experience = Experience( + position_title=position_title, + from_date=from_date, + to_date=to_date, + duration=duration, + location=location, + description=description, + institution_name=company, + linkedin_url=company_linkedin_url + ) + self.add_experience(experience) + except (NoSuchElementException, IndexError) as e: + # Skip this description if elements are missing + continue else: description = position_summary_text.text if position_summary_text else "" @@ -215,47 +250,69 @@ def get_educations(self): self.scroll_to_bottom() main_list = self.wait_for_element_to_load(name="pvs-list__container", base=main) for position in main_list.find_elements(By.CLASS_NAME,"pvs-list__paged-list-item"): - position = position.find_element(By.XPATH,"//div[@data-view-name='profile-component-entity']") - institution_logo_elem, position_details = position.find_elements(By.XPATH,"*") - - # company elem - institution_linkedin_url = institution_logo_elem.find_element(By.XPATH,"*").get_attribute("href") - - # position details - position_details_list = position_details.find_elements(By.XPATH,"*") - position_summary_details = position_details_list[0] if len(position_details_list) > 0 else None - position_summary_text = position_details_list[1] if len(position_details_list) > 1 else None - outer_positions = position_summary_details.find_element(By.XPATH,"*").find_elements(By.XPATH,"*") - - institution_name = outer_positions[0].find_element(By.TAG_NAME,"span").text - if len(outer_positions) > 1: - degree = outer_positions[1].find_element(By.TAG_NAME,"span").text - else: - degree = None - - if len(outer_positions) > 2: - times = outer_positions[2].find_element(By.TAG_NAME,"span").text + try: + position = position.find_element(By.CSS_SELECTOR, "div[data-view-name='profile-component-entity']") + + # Fix: Handle case where more than 2 elements are returned + elements = position.find_elements(By.XPATH,"*") + if len(elements) < 2: + continue # Skip if we don't have enough elements + + institution_logo_elem = elements[0] + position_details = elements[1] + + # institution elem + try: + institution_linkedin_url = institution_logo_elem.find_element(By.XPATH,"*").get_attribute("href") + except NoSuchElementException: + institution_linkedin_url = None + + # position details + position_details_list = position_details.find_elements(By.XPATH,"*") + position_summary_details = position_details_list[0] if len(position_details_list) > 0 else None + position_summary_text = position_details_list[1] if len(position_details_list) > 1 else None + + if not position_summary_details: + continue + + outer_positions = position_summary_details.find_element(By.XPATH,"*").find_elements(By.XPATH,"*") + + institution_name = outer_positions[0].find_element(By.TAG_NAME,"span").text if outer_positions else "" + degree = outer_positions[1].find_element(By.TAG_NAME,"span").text if len(outer_positions) > 1 else None - if times != "": - from_date = times.split(" ")[times.split(" ").index("-")-1] if len(times.split(" "))>3 else times.split(" ")[0] - to_date = times.split(" ")[-1] - else: from_date = None to_date = None + + if len(outer_positions) > 2: + try: + times = outer_positions[2].find_element(By.TAG_NAME,"span").text + + if times and "-" in times: + split_times = times.split(" ") + dash_index = split_times.index("-") if "-" in split_times else -1 + + if dash_index > 0: + from_date = split_times[dash_index-1] + if dash_index < len(split_times) - 1: + to_date = split_times[-1] + except (NoSuchElementException, ValueError): + from_date = None + to_date = None + description = position_summary_text.text if position_summary_text else "" - - description = position_summary_text.text if position_summary_text else "" - - education = Education( - from_date=from_date, - to_date=to_date, - description=description, - degree=degree, - institution_name=institution_name, - linkedin_url=institution_linkedin_url - ) - self.add_education(education) + education = Education( + from_date=from_date, + to_date=to_date, + description=description, + degree=degree, + institution_name=institution_name, + linkedin_url=institution_linkedin_url + ) + self.add_education(education) + except (NoSuchElementException, IndexError) as e: + # Skip this education entry if elements are missing + continue def get_name_and_location(self): top_panel = self.driver.find_element(By.XPATH, "//*[@class='mt2 relative']")