-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathscraper_fixed.py
More file actions
113 lines (95 loc) · 3.78 KB
/
scraper_fixed.py
File metadata and controls
113 lines (95 loc) · 3.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
#!/usr/bin/env python3
import csv
import time
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.chrome.service import Service
from bs4 import BeautifulSoup
# Configuration
INPUT_CSV = 'collected_thumbnails.csv'
OUTPUT_CSV = 'extracted_tables.csv'
PAGE_LOAD_WAIT = 5 # Seconds to wait for JS to render after loading
def read_urls_from_csv(file_path):
try:
df = pd.read_csv(file_path)
column_name = df.columns[0]
urls = [url.strip() for url in df[column_name].dropna() if str(url).strip()]
print(f"Found {len(urls)} URLs in {file_path}")
return urls
except Exception as e:
print(f"Error reading CSV file: {e}")
return []
def setup_browser():
options = Options()
options.add_argument("--headless") # Run Chrome in headless mode (no UI)
options.add_argument("--disable-gpu")
options.add_argument("--no-sandbox")
options.add_argument("--disable-dev-shm-usage")
# Ensure this points to the correct chromedriver path
service = Service(executable_path="./chromedriver/chromedriver") # Or provide the full path like "/path/to/chromedriver"
return webdriver.Chrome(service=service, options=options)
def parse_table_from_html(html, url):
soup = BeautifulSoup(html, 'html.parser')
tables = soup.find_all('table')
print(f"[DEBUG] Found {len(tables)} table(s) at {url}")
if not tables:
print(f"[FAIL] No tables found in {url}")
print(f"[HTML SAMPLE] {html[:500]}") # Print first 500 characters of HTML
return None
table = tables[0]
headers = []
header_row = table.find('tr')
if header_row:
headers = [th.get_text(strip=True) for th in header_row.find_all('th')]
print(f"[DEBUG] Found headers: {headers}")
table_data = {'URL': url}
table_data['HEADERS'] = '|'.join(headers)
for row in table.find_all('tr')[1:]:
cells = row.find_all(['td', 'th'])
if len(cells) > 1:
row_label = cells[0].get_text(strip=True)
table_data[f'ROW_LABEL_{row_label}'] = row_label
for i in range(1, len(cells)):
header = headers[i - 1] if i - 1 < len(headers) else f'COL{i}'
cell_text = cells[i].get_text(strip=True)
key = f'{row_label}_{header}'
table_data[key] = cell_text
# Print the extracted table data to the console
print(f"\n[EXTRACTED TABLE DATA] for URL: {url}")
for key, value in table_data.items():
print(f"{key}: {value}")
return table_data
def main():
print("[START] Selenium-based scraper starting.")
urls = read_urls_from_csv(INPUT_CSV)
if not urls:
return
browser = setup_browser()
results = []
for i, url in enumerate(urls):
print(f"\n[PROCESS] ({i+1}/{len(urls)}): {url}")
try:
browser.get(url)
time.sleep(PAGE_LOAD_WAIT)
html = browser.page_source
table_data = parse_table_from_html(html, url)
if table_data:
results.append(table_data)
print(f"[SUCCESS] Table extracted from {url}")
else:
print(f"[WARN] No table extracted from {url}")
except Exception as e:
print(f"[ERROR] Selenium error on {url}: {e}")
browser.quit()
if results:
all_keys = sorted(set(k for r in results for k in r.keys()))
with open(OUTPUT_CSV, 'w', newline='', encoding='utf-8') as f:
writer = csv.DictWriter(f, fieldnames=all_keys)
writer.writeheader()
writer.writerows(results)
print(f"\n[FINISH] Wrote {len(results)} results to {OUTPUT_CSV}")
else:
print("\n[FINISH] No data to write.")
if __name__ == '__main__':
main()