pull/268/head
fakebranden 2025-04-16 20:06:52 +00:00
parent c310ff61ee
commit 5d45628f5c
1 changed files with 96 additions and 66 deletions

View File

@ -1,10 +1,14 @@
import csv, datetime, os, sys, json import csv
import datetime
import os
import sys
import json
from jobspy.google import Google from jobspy.google import Google
from jobspy.linkedin import LinkedIn from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed from jobspy.indeed import Indeed
from jobspy.model import ScraperInput from jobspy.model import ScraperInput
# Define sources # Define job sources
sources = { sources = {
"google": Google, "google": Google,
"linkedin": LinkedIn, "linkedin": LinkedIn,
@ -14,92 +18,118 @@ sources = {
def sanitize_email(email): def sanitize_email(email):
return email.replace("@", "_at_").replace(".", "_") return email.replace("@", "_at_").replace(".", "_")
def load_config(email): def load_config_file(email=None):
safe_email = sanitize_email(email) if email:
config_path = os.path.join("configs", f"config_{safe_email}.json") safe_email = sanitize_email(email)
if not os.path.exists(config_path): config_path = os.path.join("configs", f"config_{safe_email}.json")
raise FileNotFoundError(f"❌ Config for {email} not found at {config_path}") if os.path.exists(config_path):
with open(config_path, "r", encoding="utf-8") as f: print(f"📂 Loading config for {email}{config_path}")
return json.load(f), safe_email with open(config_path, "r", encoding="utf-8") as f:
return json.load(f), safe_email
else:
raise FileNotFoundError(f"❌ Config for {email} not found at {config_path}")
else:
raise ValueError("❌ Email must be passed as argument")
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
# Ensure numeric values are converted
results_wanted = int(results_wanted)
max_days_old = int(max_days_old)
def scrape_jobs(search_terms, results_wanted_str, max_days_old_str, target_state):
# Convert string values to integers
results_wanted = int(results_wanted_str.strip())
max_days_old = int(max_days_old_str.strip())
today = datetime.date.today()
all_jobs = [] all_jobs = []
today = datetime.date.today()
print(f"\n🔍 Scraping jobs for: {search_terms}")
for term in search_terms: for term in search_terms:
for source, Scraper in sources.items(): for source_name, source_class in sources.items():
print(f"🔍 Scraping {term} from {source}") print(f"🚀 Scraping '{term}' from {source_name}...")
scraper = Scraper() scraper = source_class()
criteria = ScraperInput(site_type=[source_name], search_term=term, results_wanted=results_wanted)
try: try:
jobs = scraper.scrape(ScraperInput( response = scraper.scrape(criteria)
site_type=[source],
search_term=term,
results_wanted=results_wanted
)).jobs
except Exception as e: except Exception as e:
print(f"⚠️ {source} error: {e}") print(f"❌ Error scraping {source_name}: {e}")
continue continue
for job in jobs: for job in response.jobs:
city = job.location.city.strip() if job.location.city else "Unknown"
state = job.location.state.strip().upper() if job.location.state else "Unknown"
country = str(job.location.country) if job.location.country else "Unknown"
if not any(t.lower() in job.title.lower() for t in search_terms):
continue
if job.date_posted and (today - job.date_posted).days <= max_days_old: if job.date_posted and (today - job.date_posted).days <= max_days_old:
if target_state == (job.location.state or "").upper() or job.is_remote: if state == target_state or job.is_remote:
if any(term.lower() in job.title.lower() for term in search_terms): all_jobs.append({
all_jobs.append({ "Job ID": job.id,
"Job ID": job.id, "Job Title (Primary)": job.title,
"Job Title (Primary)": job.title, "Company Name": job.company_name or "Unknown",
"Company Name": job.company_name or "Unknown", "Industry": job.company_industry or "Not Provided",
"Industry": job.company_industry or "Not Provided", "Experience Level": job.job_level or "Not Provided",
"Experience Level": job.job_level or "Not Provided", "Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided", "Is Remote": job.is_remote,
"Is Remote": job.is_remote, "Currency": job.compensation.currency if job.compensation else "",
"Currency": job.compensation.currency if job.compensation else "", "Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "", "Salary Max": job.compensation.max_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "", "Date Posted": job.date_posted.strftime("%Y-%m-%d") if job.date_posted else "Not Provided",
"Date Posted": job.date_posted.strftime("%Y-%m-%d"), "Location City": city,
"Location City": job.location.city or "Unknown", "Location State": state,
"Location State": (job.location.state or "Unknown").upper(), "Location Country": country,
"Location Country": job.location.country or "Unknown", "Job URL": job.job_url,
"Job URL": job.job_url, "Job Description": job.description.replace(",", "") if job.description else "No description available",
"Job Description": job.description.replace(",", "") if job.description else "No description", "Job Source": source_name
"Job Source": source })
}) print(f"{len(all_jobs)} jobs matched.")
print(f"✅ Found {len(all_jobs)} jobs")
return all_jobs return all_jobs
def save_to_csv(jobs, path): def save_jobs_to_csv(jobs, output_path):
os.makedirs(os.path.dirname(path), exist_ok=True) if not jobs:
print("⚠️ No jobs found.")
return
fieldnames = [ fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry", "Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency", "Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City", "Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description", "Job Source" "Location State", "Location Country", "Job URL", "Job Description",
"Job Source"
] ]
header = "|~|".join(fieldnames)
rows = [header] + ["|~|".join(str(job.get(col, "Not Provided")).replace(",", "").strip() for col in fieldnames) for job in jobs]
with open(path, "w", encoding="utf-8") as f:
f.write(",".join(rows))
print(f"💾 Saved output to: {path}")
header = "|~|".join(fieldnames)
rows = [header]
for job in jobs:
row = []
for field in fieldnames:
value = str(job.get(field, "Not Provided")).replace(",", "").strip()
row.append(value if value else "Not Provided")
rows.append("|~|".join(row))
output = ",".join(rows)
os.makedirs(os.path.dirname(output_path), exist_ok=True)
with open(output_path, "w", encoding="utf-8") as f:
f.write(output)
print(f"💾 Saved output to: {output_path}")
# MAIN
if __name__ == "__main__": if __name__ == "__main__":
try: try:
if len(sys.argv) != 3: user_email = sys.argv[1] if len(sys.argv) >= 2 else None
raise ValueError("❌ Usage: python job_scraper_dynamic.py <user_email> <run_id>") config, safe_email = load_config_file(user_email)
user_email, run_id = sys.argv[1], sys.argv[2] job_data = scrape_jobs(
config, safe_email = load_config(user_email) search_terms=config["search_terms"],
results_wanted=config["results_wanted"],
jobs = scrape_jobs( max_days_old=config["max_days_old"],
config["search_terms"], target_state=config["target_state"]
config["results_wanted"],
config["max_days_old"],
config["target_state"]
) )
save_to_csv(jobs, f"outputs/jobspy_output_{safe_email}_{run_id}.csv") output_file = f"outputs/jobspy_output_{safe_email}.csv"
save_jobs_to_csv(job_data, output_file)
except Exception as e: except Exception as e:
print(f"❌ Fatal error: {e}") print(f"❌ Fatal Error: {e}")
sys.exit(1) sys.exit(1)