#!/usr/bin/env python3 """ FormDS Web Scraper - Multi-City Version 5 (Incremental Mode) Scrapes new companies from FormDS.com/locales, stopping per city when an existing company is found on the latest filing date. Preserves previous data and applies filing_exclude only to the current run. Author: HemantaBhusal Version: 5.0 (Incremental Multi-City) """ import sys import os import urllib.request import urllib.error from html.parser import HTMLParser import mysql.connector import time import logging import concurrent.futures from datetime import datetime sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from global_config import db_config logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') class CityListParser(HTMLParser): def __init__(self): super().__init__() self.city_links = [] self.in_link = False self.current_link = "" def handle_starttag(self, tag, attrs): if tag == 'a': for attr_name, attr_value in attrs: if attr_name == 'href' and attr_value.startswith('/locales/') and attr_value != '/locales': self.current_link = attr_value self.in_link = True break def handle_endtag(self, tag): if tag == 'a' and self.in_link: self.in_link = False def handle_data(self, data): if self.in_link and self.current_link: city_name = data.strip() if city_name and 'Startups and Growing Businesses' in city_name: self.city_links.append((self.current_link, city_name)) self.current_link = "" class TableParser(HTMLParser): def __init__(self): super().__init__() self.in_table = False self.in_row = False self.in_cell = False self.current_row = [] self.current_cell = "" self.rows = [] self.cell_count = 0 self.current_link = "" self.in_link = False def handle_starttag(self, tag, attrs): if tag == 'table': self.in_table = True elif tag == 'tr' and self.in_table: self.in_row = True self.current_row = [] self.cell_count = 0 elif tag == 'td' and self.in_row: self.in_cell = True self.current_cell = "" self.current_link = "" self.cell_count += 1 elif tag == 'a' and self.in_cell: self.in_link = True for attr_name, attr_value in attrs: if attr_name == 'href': self.current_link = attr_value break def handle_endtag(self, tag): if tag == 'table': self.in_table = False elif tag == 'tr' and self.in_row: self.rows.append(self.current_row) self.in_row = False elif tag == 'td' and self.in_cell: if self.cell_count == 1 and self.current_link: combined = f"{self.current_cell.strip()}|{self.current_link}" self.current_row.append(combined) else: self.current_row.append(self.current_cell.strip()) self.in_cell = False elif tag == 'a': self.in_link = False def handle_data(self, data): if self.in_cell: self.current_cell += data class CompanyDetailParser(HTMLParser): def __init__(self): super().__init__() self.in_address = False self.in_address_tag = False self.in_table = False self.in_thead = False self.in_tbody = False self.in_row = False self.in_cell = False self.address_lines = [] self.current_text = "" self.table_headers = [] self.current_row = [] self.table_data = [] self.cell_index = 0 def handle_starttag(self, tag, attrs): if tag == 'div': for attr_name, attr_value in attrs: if attr_name == 'id' and attr_value == 'address-div': self.in_address = True break elif tag == 'address' and self.in_address: self.in_address_tag = True elif tag == 'table': self.in_table = True self.table_headers = [] self.table_data = [] elif tag == 'thead' and self.in_table: self.in_thead = True elif tag == 'tbody' and self.in_table: self.in_tbody = True elif tag == 'tr': self.in_row = True self.current_row = [] self.cell_index = 0 elif tag == 'th' and self.in_row: self.in_cell = True self.current_text = "" elif tag == 'td' and self.in_row: self.in_cell = True self.current_text = "" def handle_endtag(self, tag): if tag == 'div' and self.in_address: self.in_address = False elif tag == 'address' and self.in_address_tag: self.in_address_tag = False elif tag == 'table': self.in_table = False elif tag == 'thead': self.in_thead = False elif tag == 'tbody': self.in_tbody = False elif tag == 'tr' and self.in_row: if self.in_thead and self.current_row: self.table_headers = self.current_row elif not self.in_thead and self.current_row: self.table_data.append(self.current_row) self.in_row = False elif tag in ['th', 'td'] and self.in_cell: self.current_row.append(self.current_text.strip()) self.in_cell = False self.cell_index += 1 def handle_data(self, data): if self.in_address_tag: clean_data = data.strip() if clean_data: self.address_lines.append(clean_data) elif self.in_cell: self.current_text += data def get_address(self): result = ' '.join(self.address_lines) return result def get_contacts(self): contacts = [] if self.table_headers: headers_lower = [h.lower() for h in self.table_headers] if 'name' in headers_lower and 'role' in headers_lower: for row in self.table_data: if len(row) >= 2: name = row[0].strip() role = row[1].strip().replace(', ', '/').replace(',', '/') if name and role: contacts.append(f"{name} : {role}") result = ", ".join(contacts) return result class MultiCityFormDSScraper: def __init__(self, max_workers=8, delay=1): self.base_url = "https://www.formds.com" self.locales_url = "https://www.formds.com/locales" self.max_workers = max_workers self.delay = delay # Statistics self.stats = { 'cities_processed': 0, 'total_companies': 0, 'excluded_companies': 0, 'existing_companies': 0, 'successful_details': 0, 'failed_details': 0 } # Load exclusion list and latest filings from database self.excluded_types = self.load_exclusions_from_db() self.latest_filings = self.load_latest_filings() logging.info(f"Initialized scraper: {max_workers} workers, {delay}s delay, exclusions={self.excluded_types}") def load_exclusions_from_db(self): connection = None try: connection = mysql.connector.connect(**db_config) cursor = connection.cursor() cursor.execute("SELECT exclude_text FROM filing_exclude") results = cursor.fetchall() excluded_types = [row[0].strip() for row in results if row[0] and row[0].strip()] return excluded_types except mysql.connector.Error as error: logging.error(f"Database error loading exclusions: {error}") return [] except Exception as e: logging.error(f"Error loading exclusions from database: {e}") return [] finally: if connection and connection.is_connected(): cursor.close() connection.close() def load_latest_filings(self): connection = None try: connection = mysql.connector.connect(**db_config) cursor = connection.cursor() cursor.execute("SELECT cities, MAX(filing_date) AS latest_date FROM filings5 GROUP BY cities") latest_dates = {row[0]: row[1] for row in cursor.fetchall()} latest_filings = {} for city, latest_date in latest_dates.items(): if latest_date: cursor.execute("SELECT company FROM filings5 WHERE cities = %s AND filing_date = %s", (city, latest_date)) companies = {row[0] for row in cursor.fetchall()} latest_filings[city] = {'latest_date': latest_date, 'companies': companies} logging.info(f" Loaded for {city}: latest_date={latest_date}, companies={len(companies)}") return latest_filings except mysql.connector.Error as error: logging.error(f"Database error loading latest filings: {error}") return {} except Exception as e: logging.error(f"Error loading latest filings: {e}") return {} finally: if connection and connection.is_connected(): cursor.close() connection.close() def make_request(self, url): try: req = urllib.request.Request( url, headers={ 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36', 'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8', 'Accept-Language': 'en-US,en;q=0.5', 'Connection': 'keep-alive' } ) with urllib.request.urlopen(req, timeout=15) as response: return response.read().decode('utf-8') except urllib.error.HTTPError as e: logging.error(f"HTTP {e.code} error for {url}: {e.reason}") return None except urllib.error.URLError as e: logging.error(f"URL error for {url}: {e.reason}") return None except Exception as e: logging.error(f"Unexpected error fetching {url}: {e}") return None def get_city_list(self): html_content = self.make_request(self.locales_url) if not html_content: logging.error("Failed to fetch city list") return [] parser = CityListParser() parser.feed(html_content) cities = [] for link, name in parser.city_links: clean_name = self.clean_city_name(name) cities.append((link, clean_name)) return cities def clean_city_name(self, original_name): clean_name = original_name.replace(" Startups and Growing Businesses", "") if not clean_name.endswith(" Area"): clean_name += " Area" return clean_name def extract_company_type(self, company_name): company_name = company_name.strip() if company_name.endswith(')'): start_idx = company_name.rfind('(') if start_idx != -1: company_type = company_name[start_idx+1:-1].strip() return company_type return "" def is_excluded(self, company_name): if not self.excluded_types: return False company_type = self.extract_company_type(company_name) return company_type in self.excluded_types def get_city_page(self, city_link, page_num=1): url = f"{self.base_url}{city_link}" if page_num > 1: url += f"?page={page_num}" html_content = self.make_request(url) if not html_content: return None parser = TableParser() parser.feed(html_content) return parser.rows def get_company_details(self, company_url): try: if not company_url: return "", "" full_url = f"{self.base_url}{company_url}" html_content = self.make_request(full_url) if not html_content: return "", "" parser = CompanyDetailParser() parser.feed(html_content) address = parser.get_address() contacts = parser.get_contacts() return address, contacts except Exception as e: logging.warning(f"Failed to get details for {company_url}: {e}") return "", "" def get_company_details_batch(self, company_urls_data): if not company_urls_data: return [] results = [None] * len(company_urls_data) def fetch_single_company(index_url_tuple): index, (company_name, url) = index_url_tuple if not url: return index, ("", "") try: address, contacts = self.get_company_details(url) self.stats['successful_details'] += 1 return index, (address, contacts) except Exception as e: logging.warning(f"Error fetching details for {company_name}: {e}") self.stats['failed_details'] += 1 return index, ("", "") with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor: futures = [] for i, (company_name, url) in enumerate(company_urls_data): future = executor.submit(fetch_single_company, (i, (company_name, url))) futures.append(future) if i > 0: time.sleep(self.delay / self.max_workers) for future in concurrent.futures.as_completed(futures): try: index, details = future.result() results[index] = details except Exception as e: logging.error(f"Thread execution error: {e}") return results def scrape_city(self, city_link, city_name, max_pages=10): logging.info(f"Scraping {city_name}...") city_companies = [] excluded_count = 0 latest_info = self.latest_filings.get(city_name, {'latest_date': None, 'companies': set()}) latest_date = latest_info['latest_date'] known_companies = latest_info['companies'] for page_num in range(1, max_pages + 1): logging.info(f" Fetching {city_name} page {page_num}...") rows = self.get_city_page(city_link, page_num) if not rows or len(rows) <= 1: logging.info(f" No more data on page {page_num}") break page_companies = [] company_urls_data = [] stop_scraping = False for row_idx, row in enumerate(rows[1:], 1): if len(row) >= 6: try: company_data = row[0] if '|' in company_data: company, company_url = company_data.split('|', 1) else: company = company_data company_url = "" company = ' '.join(company.split()) filing_date_str = row[4] try: filing_date = datetime.strptime(filing_date_str, '%Y-%m-%d').date() except ValueError as e: logging.warning(f" Invalid date format for {company} on page {page_num}: {filing_date_str}") continue if latest_date: if filing_date < latest_date: logging.info(f" Stopped: {company} has earlier date {filing_date} vs latest {latest_date}") stop_scraping = True break elif filing_date == latest_date and company in known_companies: logging.info(f" Stopped: {company} found in DB for {city_name} on {latest_date}") self.stats['existing_companies'] += 1 stop_scraping = True break if self.is_excluded(company): excluded_count += 1 self.stats['excluded_companies'] += 1 logging.debug(f" Excluded: {company} ({self.extract_company_type(company)})") continue company_info = { 'cities': city_name, 'company': company, 'reported_funding': row[2], 'incremental_cash': row[3], 'filing_date': filing_date_str, 'new_or_amended': row[5], 'company_address': "", 'company_all_contact': "" } page_companies.append(company_info) company_urls_data.append((company, company_url)) except Exception as e: logging.warning(f" Error parsing row {row_idx} on page {page_num}: {e}") continue # Process and extend page_companies even if stop triggered (to save pre-stop rows) if page_companies: logging.info(f" Processing details for {len(page_companies)} companies on page {page_num}...") start_time = time.time() company_details = self.get_company_details_batch(company_urls_data) for i, (address, contacts) in enumerate(company_details): if i < len(page_companies): page_companies[i]['company_address'] = address page_companies[i]['company_all_contact'] = contacts elapsed = time.time() - start_time logging.info(f" Details processed in {elapsed:.1f}s") city_companies.extend(page_companies) self.stats['total_companies'] += len(page_companies) if stop_scraping: break time.sleep(self.delay) self.stats['cities_processed'] += 1 logging.info(f"Completed {city_name}: {len(city_companies)} new companies, {excluded_count} excluded") return city_companies def scrape_all_cities(self, test_cities=None): cities = self.get_city_list() if not cities: logging.error("No cities found to scrape") return [] if test_cities: cities = cities[:test_cities] logging.info(f"TEST MODE: Processing first {test_cities} cities") all_companies = [] for i, (city_link, city_name) in enumerate(cities, 1): logging.info(f"Processing city {i}/{len(cities)}: {city_name}") try: city_companies = self.scrape_city(city_link, city_name) all_companies.extend(city_companies) logging.info(f"PROGRESS TILL NOW: {self.stats['cities_processed']}/{len(cities)} cities, " f"{self.stats['total_companies']} new companies, " f"{self.stats['excluded_companies']} excluded") except Exception as e: logging.error(f"Failed to scrape {city_name}: {e}") continue return all_companies def save_to_mysql(self, data): if not data: logging.warning("No data to save") return connection = None try: connection = mysql.connector.connect(**db_config) cursor = connection.cursor() insert_query = """ INSERT INTO filings5 (cities, company, reported_funding, incremental_cash, filing_date, new_or_amended, company_address, company_all_contact) VALUES (%s, %s, %s, %s, %s, %s, %s, %s) """ batch_data = [ ( filing['cities'], filing['company'], filing['reported_funding'], filing['incremental_cash'], filing['filing_date'], filing['new_or_amended'], filing['company_address'], filing['company_all_contact'] ) for filing in data ] cursor.executemany(insert_query, batch_data) connection.commit() logging.info(f"Successfully saved {len(data)} new records to database") except mysql.connector.Error as error: logging.error(f"MySQL database error: {error}") if connection: connection.rollback() except Exception as e: logging.error(f"Unexpected database error: {e}") if connection: connection.rollback() finally: if connection and connection.is_connected(): cursor.close() connection.close() def print_final_stats(self): logging.info("=" * 60) logging.info("FINAL STATISTICS:") logging.info(f"Cities processed: {self.stats['cities_processed']}") logging.info(f"Total new companies found: {self.stats['total_companies']}") logging.info(f"Excluded companies: {self.stats['excluded_companies']}") logging.info(f"Successful detail fetches: {self.stats['successful_details']}") logging.info(f"Failed detail fetches: {self.stats['failed_details']}") logging.info("=" * 60) def main(): try: test_cities = None if len(sys.argv) > 1 and sys.argv[1].startswith('--'): try: test_cities = int(sys.argv[1][2:]) logging.info(f"Test mode enabled: processing {test_cities} cities") except ValueError: logging.error(f"Invalid test parameter: {sys.argv[1]}") sys.exit(1) scraper = MultiCityFormDSScraper() all_companies = scraper.scrape_all_cities(test_cities=test_cities) if all_companies: scraper.save_to_mysql(all_companies) print(f"Successfully processed {len(all_companies)} new companies across {scraper.stats['cities_processed']} cities") else: print("No new companies found to save") scraper.print_final_stats() except KeyboardInterrupt: logging.info("Scraping interrupted by user") sys.exit(0) except Exception as e: logging.error(f"Scraper failed: {e}") sys.exit(1) if __name__ == "__main__": main()