Compare commits

...

48 Commits

Author SHA1 Message Date
fakebranden
77cc1f8550 update for artifact with run ID 2025-04-15 09:01:33 +00:00
fakebranden
84b4524c43 fix the create or modify output file in folder 2025-04-15 08:30:44 +00:00
fakebranden
e6ae23c76f update output csv in yml for correct format 2025-04-15 08:06:36 +00:00
fakebranden
0103e11234 add test file to outputs for visibility 2025-04-15 08:01:10 +00:00
fakebranden
697ae5c8c9 delete manual output file from testing 2025-04-15 07:49:44 +00:00
fakebranden
9e0674f7fc updated yml so jobspy scraper runs properly 2025-04-15 07:38:56 +00:00
fakebranden
bbdad3584e updates to capital letter in configs files 2025-04-15 07:34:20 +00:00
fakebranden
a045bb442a add configs folder 2025-04-15 06:51:22 +00:00
fakebranden
3eb4c122e7 Delete configs/config_branden_at_autoemployme_onmicrosoft_com.json 2025-04-15 02:26:08 -04:00
fakebranden
74877c5fd8 Delete configs/config_Branden_at_autoemployme_onmicrosoft_com.json 2025-04-15 02:26:00 -04:00
JobSpy Bot
0a475e312f 🔄 Updated config for Branden@autoemployme.onmicrosoft.com 2025-04-15 02:11:26 -04:00
JobSpy Bot
e0514d218e 🔄 Updated config for Branden@autoemployme.onmicrosoft.com 2025-04-15 01:25:35 -04:00
fakebranden
529aa8a1f4 fixed configs and outputs file paths add & modify 2025-04-15 02:13:24 +00:00
fakebranden
93a21941eb outputs folder added sample file 2025-04-15 01:54:37 +00:00
fakebranden
8f8b39c6e2 outputs and configs folder added 2025-04-15 01:52:03 +00:00
fakebranden
cdcd79edfe add configs folder 2025-04-15 00:46:30 +00:00
fakebranden
89a40dc3e3 updated py and yml dynamic 2025-04-14 23:39:28 +00:00
fakebranden
6a326b7dd4 dynamic yml and py update 2025-04-14 21:37:07 +00:00
fakebranden
0a5c5fa9b3 yml matches dynamic output 2025-04-14 21:26:28 +00:00
fakebranden
e22e4cc092 updated dynamic 2025-04-14 21:02:02 +00:00
fakebranden
0abe28fae4 further dynamic updates to scraper for output 2025-04-14 19:00:30 +00:00
fakebranden
31d0389dd8 updated dynamic workflow added 2025-04-14 18:30:34 +00:00
fakebranden
fb9ab3a315 dynamic jobscraper py and config file 2025-04-14 18:21:11 +00:00
fakebranden
c34eff610f updated criteria 2025-04-07 16:12:53 +00:00
fakebranden
e9160a0b4c adjusted scraper for better delimiter and comma only between records 2025-03-12 00:47:10 +00:00
fakebranden
cd916c7978 reverted ziprecruiter 2025-03-12 00:16:09 +00:00
fakebranden
25c084ca2c removed commas in fields 2025-03-12 00:03:02 +00:00
fakebranden
341deba465 updated job description no limit 2025-03-10 19:40:12 +00:00
fakebranden
5337b3ec7f new exact job scraper 2025-03-10 19:11:36 +00:00
fakebranden
0171ecc4a0 update search criteria format 2025-03-10 05:05:17 +00:00
fakebranden
e191405c8e change actions to read 2025-03-08 09:16:16 +00:00
fakebranden
a2d139cb96 removed schedule cron so power automate can trigger the workflow 2025-03-07 21:54:00 +00:00
fakebranden
9e41e6e9db fixed yml file 2025-03-07 21:26:09 +00:00
fakebranden
bb7d4c55ed updated yml from requirements.txt# 2025-03-07 21:23:16 +00:00
fakebranden
58cc1937bb added req. 2025-03-07 21:21:01 +00:00
fakebranden
60819a8fca Merge branch 'main' of https://github.com/fakebranden/JobSpy 2025-03-07 21:15:32 +00:00
fakebranden
1c59cd6738 git add requirements.txt
git commit -m "Added requirements.txt"
git push origin main
2025-03-07 20:55:22 +00:00
fakebranden
eed96e4c04 Create requirements.txt 2025-03-07 15:53:26 -05:00
fakebranden
83c64f4bca Update jobspy_scraper.yml 2025-03-07 15:43:59 -05:00
fakebranden
d8ad9da1c0 Update jobspy_scraper.yml 2025-03-07 15:39:12 -05:00
fakebranden
5f5738eaaa new yml 2025-03-07 19:18:44 +00:00
fakebranden
e1da326317 all funtionality 2025-03-07 18:57:14 +00:00
Cullen Watson
6782b9884e fix:workflow 2025-03-01 14:49:31 -06:00
Cullen Watson
94c74d60f2 enh:workflow manual run 2025-03-01 14:47:24 -06:00
Cullen Watson
5463e5a664 chore:version 2025-03-01 14:38:25 -06:00
arkhy
ed139e7e6b added missing EU countries and languages (#250)
Co-authored-by: Kate Arkhangelskaya <ekar559e@tu-dresden.de>
2025-03-01 14:30:08 -06:00
Cullen Watson
5bd199d0a5 Merge branch 'main' of https://github.com/Bunsly/JobSpy 2025-02-21 14:15:06 -06:00
Cullen Watson
4ec308a302 refactor:organize code 2025-02-21 14:14:55 -06:00
36 changed files with 2489 additions and 701 deletions

View File

@@ -0,0 +1,49 @@
name: JobSpy Scraper Dynamic Workflow
on:
workflow_dispatch:
inputs:
user_email:
description: 'Email of user'
required: true
default: 'Branden@autoemployme.onmicrosoft.com'
permissions:
contents: read
id-token: write
jobs:
scrape_jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Repo
uses: actions/checkout@v3
- name: Set Up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Dependencies
run: |
pip install --upgrade pip
pip install -r requirements.txt
- name: Sanitize Email + Create Run ID
id: vars
run: |
safe_email=$(echo "${{ github.event.inputs.user_email }}" | sed 's/@/_at_/g; s/\./_/g')
run_id=$(date +%s)
echo "safe_email=$safe_email" >> $GITHUB_OUTPUT
echo "run_id=$run_id" >> $GITHUB_OUTPUT
- name: Run Job Scraper
run: |
python job_scraper_dynamic.py "${{ github.event.inputs.user_email }}" "${{ steps.vars.outputs.run_id }}"
- name: Upload Output Artifact
uses: actions/upload-artifact@v4
with:
name: jobspy_output_${{ steps.vars.outputs.safe_email }}_${{ steps.vars.outputs.run_id }}
path: outputs/jobspy_output_${{ steps.vars.outputs.safe_email }}_${{ steps.vars.outputs.run_id }}.csv

48
.github/workflows/jobspy_scraper.yml vendored Normal file
View File

@@ -0,0 +1,48 @@
name: JobSpy Scraper Workflow
on:
workflow_dispatch: # Allows manual trigger from GitHub or Power Automate
# Remove or comment out the schedule to prevent auto-runs
# schedule:
# - cron: '0 */6 * * *' # Runs every 6 hours (DISABLED)
permissions:
actions: read
contents: read
id-token: write
jobs:
scrape_jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run JobSpy Scraper
run: python job_scraper_exact_match.py
- name: Debug - Check if jobspy_output.csv exists
run: |
if [ ! -f jobspy_output.csv ]; then
echo "❌ ERROR: jobspy_output.csv not found!"
exit 1
else
echo "✅ jobspy_output.csv found, proceeding to upload..."
fi
- name: Upload JobSpy Output as Artifact
uses: actions/upload-artifact@v4 # Explicitly using latest version
with:
name: jobspy-results
path: jobspy_output.csv

View File

@@ -1,50 +1,37 @@
name: Publish Python 🐍 distributions 📦 to PyPI
name: Publish JobSpy to PyPi
on:
pull_request:
types:
- closed
permissions:
contents: write
push:
branches:
- main
workflow_dispatch:
jobs:
build-n-publish:
name: Build and publish Python 🐍 distributions 📦 to PyPI
name: Build and publish JobSpy to PyPi
runs-on: ubuntu-latest
if: github.event.pull_request.merged == true && github.event.pull_request.base.ref == 'main'
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.10"
- name: Install dependencies
run: pip install toml
- name: Increment version
run: python increment_version.py
- name: Commit version increment
run: |
git config --global user.name 'github-actions'
git config --global user.email 'github-actions@github.com'
git add pyproject.toml
git commit -m 'Increment version'
- name: Push changes
run: git push
- name: Install poetry
run: pip install poetry --user
run: >-
python3 -m
pip install
poetry
--user
- name: Build distribution 📦
run: poetry build
run: >-
python3 -m
poetry
build
- name: Publish distribution 📦 to PyPI
if: startsWith(github.ref, 'refs/tags') || github.event_name == 'workflow_dispatch'
uses: pypa/gh-action-pypi-publish@release/v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}
password: ${{ secrets.PYPI_API_TOKEN }}

8
configs/config.json Normal file
View File

@@ -0,0 +1,8 @@
{
"search_terms": ["IT Support", "Help Desk"],
"results_wanted": 50,
"max_days_old": 7,
"target_state": "NY",
"user_email": "Branden@autoemployme.onmicrosoft.com"
}

View File

@@ -0,0 +1,8 @@
{
"search_terms": ["Testing", "Help Desk", "Support"],
"results_wanted": 50,
"max_days_old": 7,
"target_state": "NY",
"user_email": "Branden@autoemployme.onmicrosoft.com"
}

View File

@@ -1,21 +0,0 @@
import toml
def increment_version(version):
major, minor, patch = map(int, version.split('.'))
patch += 1
return f"{major}.{minor}.{patch}"
# Load pyproject.toml
with open('pyproject.toml', 'r') as file:
pyproject = toml.load(file)
# Increment the version
current_version = pyproject['tool']['poetry']['version']
new_version = increment_version(current_version)
pyproject['tool']['poetry']['version'] = new_version
# Save the updated pyproject.toml
with open('pyproject.toml', 'w') as file:
toml.dump(pyproject, file)
print(f"Version updated from {current_version} to {new_version}")

116
job_scraper.py Normal file
View File

@@ -0,0 +1,116 @@
import csv
import datetime
from jobspy.google import Google
from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed
from jobspy.ziprecruiter import ZipRecruiter
from jobspy.model import ScraperInput
# Define job sources
sources = {
"google": Google,
"linkedin": LinkedIn,
"indeed": Indeed,
"zip_recruiter": ZipRecruiter,
}
# Define search preferences
search_terms = ["Automation Engineer", "CRM Manager", "Implementation Specialist"]
results_wanted = 200 # Fetch more jobs
max_days_old = 2 # Fetch jobs posted in last 48 hours
target_state = "NY" # Only keep jobs from New York
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
"""Scrape jobs from multiple sources and filter by state."""
all_jobs = []
today = datetime.date.today()
print("\n🔎 DEBUG: Fetching jobs for search terms:", search_terms)
for search_term in search_terms:
for source_name, source_class in sources.items():
print(f"\n🚀 Scraping {search_term} from {source_name}...")
scraper = source_class()
search_criteria = ScraperInput(
site_type=[source_name],
search_term=search_term,
results_wanted=results_wanted,
)
job_response = scraper.scrape(search_criteria)
for job in job_response.jobs:
# Normalize location fields
location_city = job.location.city.strip() if job.location.city else "Unknown"
location_state = job.location.state.strip().upper() if job.location.state else "Unknown"
location_country = str(job.location.country) if job.location.country else "Unknown"
# Debug: Show all jobs being fetched
print(f"📍 Fetched Job: {job.title} - {location_city}, {location_state}, {location_country}")
# Ensure the job is recent
if job.date_posted and (today - job.date_posted).days <= max_days_old:
if location_state == target_state or job.is_remote:
print(f"✅ MATCH (In NY or Remote): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
all_jobs.append({
"Job ID": job.id,
"Job Title (Primary)": job.title,
"Company Name": job.company_name if job.company_name else "Unknown",
"Industry": job.company_industry if job.company_industry else "Not Provided",
"Experience Level": job.job_level if job.job_level else "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Is Remote": job.is_remote,
"Currency": job.compensation.currency if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "",
"Date Posted": job.date_posted.strftime("%Y-%m-%d") if job.date_posted else "Not Provided",
"Location City": location_city,
"Location State": location_state,
"Location Country": location_country,
"Job URL": job.job_url,
"Job Description": job.description[:500] if job.description else "No description available",
"Job Source": source_name
})
else:
print(f"❌ Ignored (Wrong State): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
else:
print(f"⏳ Ignored (Too Old): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
print(f"\n{len(all_jobs)} jobs retrieved in NY")
return all_jobs
def save_jobs_to_csv(jobs, filename="jobspy_output.csv"):
"""Save job data to a CSV file."""
if not jobs:
print("⚠️ No jobs found matching criteria.")
return
fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description",
"Job Source"
]
with open(filename, mode="w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(jobs)
print(f"✅ Jobs saved to {filename} ({len(jobs)} entries)")
# Run the scraper with multiple job searches
job_data = scrape_jobs(
search_terms=search_terms,
results_wanted=results_wanted,
max_days_old=max_days_old,
target_state=target_state
)
# Save results to CSV
save_jobs_to_csv(job_data)

94
job_scraper_dynamic.py Normal file
View File

@@ -0,0 +1,94 @@
import csv, datetime, os, sys, json
from jobspy.google import Google
from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed
from jobspy.model import ScraperInput
sources = {
"google": Google,
"linkedin": LinkedIn,
"indeed": Indeed,
}
def sanitize_email(email):
return email.replace("@", "_at_").replace(".", "_")
def load_config(email):
safe_email = sanitize_email(email)
config_path = os.path.join("configs", f"config_{safe_email}.json")
if not os.path.exists(config_path):
raise FileNotFoundError(f"❌ Config for {email} not found at {config_path}")
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f), safe_email
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
today = datetime.date.today()
all_jobs = []
for term in search_terms:
for source, Scraper in sources.items():
print(f"🔍 Scraping {term} from {source}")
scraper = Scraper()
try:
jobs = scraper.scrape(ScraperInput(
site_type=[source],
search_term=term,
results_wanted=results_wanted
)).jobs
except Exception as e:
print(f"⚠️ {source} error: {e}")
continue
for job in jobs:
if job.date_posted and (today - job.date_posted).days <= max_days_old:
if target_state == (job.location.state or "").upper() or job.is_remote:
if any(term.lower() in job.title.lower() for term in search_terms):
all_jobs.append({
"Job ID": job.id,
"Job Title (Primary)": job.title,
"Company Name": job.company_name or "Unknown",
"Industry": job.company_industry or "Not Provided",
"Experience Level": job.job_level or "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Is Remote": job.is_remote,
"Currency": job.compensation.currency if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "",
"Date Posted": job.date_posted.strftime("%Y-%m-%d"),
"Location City": job.location.city or "Unknown",
"Location State": (job.location.state or "Unknown").upper(),
"Location Country": job.location.country or "Unknown",
"Job URL": job.job_url,
"Job Description": job.description.replace(",", "") if job.description else "No description",
"Job Source": source
})
print(f"✅ Found {len(all_jobs)} jobs")
return all_jobs
def save_to_csv(jobs, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description", "Job Source"
]
header = "|~|".join(fieldnames)
rows = [header] + ["|~|".join(str(job.get(col, "Not Provided")).replace(",", "").strip() for col in fieldnames) for job in jobs]
with open(path, "w", encoding="utf-8") as f:
f.write(",".join(rows))
print(f"💾 Saved output to: {path}")
if __name__ == "__main__":
try:
if len(sys.argv) != 3:
raise ValueError("❌ Usage: python job_scraper_dynamic.py <user_email> <run_id>")
user_email, run_id = sys.argv[1], sys.argv[2]
config, safe_email = load_config(user_email)
jobs = scrape_jobs(config["search_terms"], config["results_wanted"], config["max_days_old"], config["target_state"])
save_to_csv(jobs, f"outputs/jobspy_output_{safe_email}_{run_id}.csv")
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)

146
job_scraper_exact_match.py Normal file
View File

@@ -0,0 +1,146 @@
import csv
import datetime
import os
from jobspy.google import Google
from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed
from jobspy.model import ScraperInput
# Define job sources
sources = {
"google": Google,
"linkedin": LinkedIn,
"indeed": Indeed,
}
# Define search preferences
search_terms = ["Automation Engineer", "CRM Manager", "Implementation Specialist", "CRM", "Project Manager", "POS", "Microsoft Power", "IT Support"]
results_wanted = 100 # Fetch more jobs
max_days_old = 2 # Fetch jobs posted in last 48 hours
target_state = "NY" # Only keep jobs from New York
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
"""Scrape jobs from multiple sources and filter by state."""
all_jobs = []
today = datetime.date.today()
print("\n🔎 DEBUG: Fetching jobs for search terms:", search_terms)
for search_term in search_terms:
for source_name, source_class in sources.items():
print(f"\n🚀 Scraping {search_term} from {source_name}...")
scraper = source_class()
search_criteria = ScraperInput(
site_type=[source_name],
search_term=search_term,
results_wanted=results_wanted,
)
job_response = scraper.scrape(search_criteria)
for job in job_response.jobs:
# Normalize location fields
location_city = job.location.city.strip() if job.location.city else "Unknown"
location_state = job.location.state.strip().upper() if job.location.state else "Unknown"
location_country = str(job.location.country) if job.location.country else "Unknown"
# Debug: Show all jobs being fetched
print(f"📍 Fetched Job: {job.title} - {location_city}, {location_state}, {location_country}")
# Exclude jobs that dont explicitly match the search terms
if not any(term.lower() in job.title.lower() for term in search_terms):
print(f"🚫 Excluding: {job.title} (Doesn't match {search_terms})")
continue # Skip this job
# Ensure the job is recent
if job.date_posted and (today - job.date_posted).days <= max_days_old:
# Only accept jobs if they're in NY or Remote
if location_state == target_state or job.is_remote:
print(f"✅ MATCH: {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
all_jobs.append({
"Job ID": job.id,
"Job Title (Primary)": job.title,
"Company Name": job.company_name if job.company_name else "Unknown",
"Industry": job.company_industry if job.company_industry else "Not Provided",
"Experience Level": job.job_level if job.job_level else "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Is Remote": job.is_remote,
"Currency": job.compensation.currency if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "",
"Date Posted": job.date_posted.strftime("%Y-%m-%d") if job.date_posted else "Not Provided",
"Location City": location_city,
"Location State": location_state,
"Location Country": location_country,
"Job URL": job.job_url,
"Job Description": job.description.replace(",", "") if job.description else "No description available",
"Job Source": source_name
})
else:
print(f"❌ Ignored (Wrong State): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
else:
print(f"⏳ Ignored (Too Old): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
print(f"\n{len(all_jobs)} jobs retrieved in NY")
return all_jobs
def save_jobs_to_csv(jobs, filename="jobspy_output.csv"):
"""Save job data to a CSV file with custom formatting:
- Fields within a record are separated by the custom delimiter |~|
- Records are separated by a comma
- All commas in field values are removed
- Blank fields are replaced with 'Not Provided'
"""
if not jobs:
print("⚠️ No jobs found matching criteria.")
return
# Remove old CSV file before writing
if os.path.exists(filename):
os.remove(filename)
fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description",
"Job Source"
]
# Build header record using custom field delimiter
header_record = "|~|".join(fieldnames)
records = [header_record]
for job in jobs:
row = []
for field in fieldnames:
value = str(job.get(field, "")).strip()
if not value:
value = "Not Provided"
# Remove all commas from the value
value = value.replace(",", "")
row.append(value)
# Join fields with the custom delimiter
record = "|~|".join(row)
records.append(record)
# Join records with a comma as the record separator
output = ",".join(records)
with open(filename, "w", encoding="utf-8") as file:
file.write(output)
print(f"✅ Jobs saved to {filename} ({len(jobs)} entries)")
# Run the scraper with multiple job searches
job_data = scrape_jobs(
search_terms=search_terms,
results_wanted=results_wanted,
max_days_old=max_days_old,
target_state=target_state
)
# Save results to CSV with custom formatting
save_jobs_to_csv(job_data)

View File

@@ -1,25 +1,27 @@
from __future__ import annotations
import pandas as pd
from typing import Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Tuple
from .jobs import JobType, Location
from .scrapers.utils import set_logger_level, extract_salary, create_logger
from .scrapers.indeed import IndeedScraper
from .scrapers.ziprecruiter import ZipRecruiterScraper
from .scrapers.glassdoor import GlassdoorScraper
from .scrapers.google import GoogleJobsScraper
from .scrapers.linkedin import LinkedInScraper
from .scrapers.bayt import BaytScraper
from .scrapers import SalarySource, ScraperInput, Site, JobResponse, Country
from .scrapers.exceptions import (
LinkedInException,
IndeedException,
ZipRecruiterException,
GlassdoorException,
GoogleJobsException,
import pandas as pd
from jobspy.bayt import BaytScraper
from jobspy.glassdoor import Glassdoor
from jobspy.google import Google
from jobspy.indeed import Indeed
from jobspy.linkedin import LinkedIn
from jobspy.model import JobType, Location, JobResponse, Country
from jobspy.model import SalarySource, ScraperInput, Site
from jobspy.util import (
set_logger_level,
extract_salary,
create_logger,
get_enum_from_value,
map_str_to_site,
convert_to_annual,
desired_order,
)
from jobspy.ziprecruiter import ZipRecruiter
def scrape_jobs(
@@ -33,7 +35,6 @@ def scrape_jobs(
easy_apply: bool | None = None,
results_wanted: int = 15,
country_indeed: str = "usa",
hyperlinks: bool = False,
proxies: list[str] | str | None = None,
ca_cert: str | None = None,
description_format: str = "markdown",
@@ -46,28 +47,18 @@ def scrape_jobs(
**kwargs,
) -> pd.DataFrame:
"""
Simultaneously scrapes job data from multiple job sites.
:return: pandas dataframe containing job data
Scrapes job data from job boards concurrently
:return: Pandas DataFrame containing job data
"""
SCRAPER_MAPPING = {
Site.LINKEDIN: LinkedInScraper,
Site.INDEED: IndeedScraper,
Site.ZIP_RECRUITER: ZipRecruiterScraper,
Site.GLASSDOOR: GlassdoorScraper,
Site.GOOGLE: GoogleJobsScraper,
Site.LINKEDIN: LinkedIn,
Site.INDEED: Indeed,
Site.ZIP_RECRUITER: ZipRecruiter,
Site.GLASSDOOR: Glassdoor,
Site.GOOGLE: Google,
Site.BAYT: BaytScraper,
}
set_logger_level(verbose)
def map_str_to_site(site_name: str) -> Site:
return Site[site_name.upper()]
def get_enum_from_value(value_str):
for job_type in JobType:
if value_str in job_type.value:
return job_type
raise Exception(f"Invalid job type: {value_str}")
job_type = get_enum_from_value(job_type) if job_type else None
def get_site_type():
@@ -127,28 +118,12 @@ def scrape_jobs(
site_value, scraped_data = future.result()
site_to_jobs_dict[site_value] = scraped_data
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
jobs_dfs: list[pd.DataFrame] = []
for site, job_response in site_to_jobs_dict.items():
for job in job_response.jobs:
job_data = job.dict()
job_url = job_data["job_url"]
job_data["job_url_hyper"] = f'<a href="{job_url}">{job_url}</a>'
job_data["site"] = site
job_data["company"] = job_data["company_name"]
job_data["job_type"] = (
@@ -211,38 +186,6 @@ def scrape_jobs(
# Step 2: Concatenate the filtered DataFrames
jobs_df = pd.concat(filtered_dfs, ignore_index=True)
# Desired column order
desired_order = [
"id",
"site",
"job_url_hyper" if hyperlinks else "job_url",
"job_url_direct",
"title",
"company",
"location",
"date_posted",
"job_type",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]
# Step 3: Ensure all desired columns are present, adding missing ones as empty
for column in desired_order:
if column not in jobs_df.columns:

View File

@@ -1,10 +1,3 @@
"""
jobspy.scrapers.bayt
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Bayt.
"""
from __future__ import annotations
import random
@@ -12,9 +5,16 @@ import time
from bs4 import BeautifulSoup
from .. import Scraper, ScraperInput, Site
from ..utils import create_logger, create_session
from ...jobs import JobPost, JobResponse, Location, Country
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
JobResponse,
Location,
Country,
)
from jobspy.util import create_logger, create_session
log = create_logger("Bayt")

View File

@@ -1,5 +1,5 @@
"""
jobspy.scrapers.exceptions
jobspy.jobboard.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of Scrapers' exceptions.

View File

@@ -1,41 +1,38 @@
"""
jobspy.scrapers.glassdoor
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Glassdoor.
"""
from __future__ import annotations
import re
import json
import requests
from typing import Optional, Tuple
from typing import Tuple
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from .constants import fallback_token, query_template, headers
from .. import Scraper, ScraperInput, Site
from ..utils import extract_emails_from_text, create_logger
from ..exceptions import GlassdoorException
from ..utils import (
from jobspy.glassdoor.constant import fallback_token, query_template, headers
from jobspy.glassdoor.util import (
get_cursor_for_page,
parse_compensation,
parse_location,
)
from jobspy.util import (
extract_emails_from_text,
create_logger,
create_session,
markdown_converter,
)
from ...jobs import (
from jobspy.exception import GlassdoorException
from jobspy.model import (
JobPost,
Compensation,
CompensationInterval,
Location,
JobResponse,
JobType,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
log = create_logger("Glassdoor")
class GlassdoorScraper(Scraper):
class Glassdoor(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
@@ -146,7 +143,7 @@ class GlassdoorScraper(Scraper):
except Exception as exc:
raise GlassdoorException(f"Glassdoor generated an exception: {exc}")
return jobs, self.get_cursor_for_page(
return jobs, get_cursor_for_page(
res_json["data"]["jobListings"]["paginationCursors"], page_num + 1
)
@@ -185,9 +182,9 @@ class GlassdoorScraper(Scraper):
if location_type == "S":
is_remote = True
else:
location = self.parse_location(location_name)
location = parse_location(location_name)
compensation = self.parse_compensation(job["header"])
compensation = parse_compensation(job["header"])
try:
description = self._fetch_job_description(job_id)
except:
@@ -321,44 +318,3 @@ class GlassdoorScraper(Scraper):
{"filterKey": "jobType", "values": self.scraper_input.job_type.value[0]}
)
return json.dumps([payload])
@staticmethod
def parse_compensation(data: dict) -> Optional[Compensation]:
pay_period = data.get("payPeriod")
adjusted_pay = data.get("payPeriodAdjustedPay")
currency = data.get("payCurrency", "USD")
if not pay_period or not adjusted_pay:
return None
interval = None
if pay_period == "ANNUAL":
interval = CompensationInterval.YEARLY
elif pay_period:
interval = CompensationInterval.get_interval(pay_period)
min_amount = int(adjusted_pay.get("p10") // 1)
max_amount = int(adjusted_pay.get("p90") // 1)
return Compensation(
interval=interval,
min_amount=min_amount,
max_amount=max_amount,
currency=currency,
)
@staticmethod
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
@staticmethod
def parse_location(location_name: str) -> Location | None:
if not location_name or location_name == "Remote":
return
city, _, state = location_name.partition(", ")
return Location(city=city, state=state)
@staticmethod
def get_cursor_for_page(pagination_cursors, page_num):
for cursor_data in pagination_cursors:
if cursor_data["pageNumber"] == page_num:
return cursor_data["cursor"]

42
jobspy/glassdoor/util.py Normal file
View File

@@ -0,0 +1,42 @@
from jobspy.model import Compensation, CompensationInterval, Location, JobType
def parse_compensation(data: dict) -> Compensation | None:
pay_period = data.get("payPeriod")
adjusted_pay = data.get("payPeriodAdjustedPay")
currency = data.get("payCurrency", "USD")
if not pay_period or not adjusted_pay:
return None
interval = None
if pay_period == "ANNUAL":
interval = CompensationInterval.YEARLY
elif pay_period:
interval = CompensationInterval.get_interval(pay_period)
min_amount = int(adjusted_pay.get("p10") // 1)
max_amount = int(adjusted_pay.get("p90") // 1)
return Compensation(
interval=interval,
min_amount=min_amount,
max_amount=max_amount,
currency=currency,
)
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
def parse_location(location_name: str) -> Location | None:
if not location_name or location_name == "Remote":
return
city, _, state = location_name.partition(", ")
return Location(city=city, state=state)
def get_cursor_for_page(pagination_cursors, page_num):
for cursor_data in pagination_cursors:
if cursor_data["pageNumber"] == page_num:
return cursor_data["cursor"]

View File

@@ -1,10 +1,3 @@
"""
jobspy.scrapers.google
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Google.
"""
from __future__ import annotations
import math
@@ -13,23 +6,21 @@ import json
from typing import Tuple
from datetime import datetime, timedelta
from .constants import headers_jobs, headers_initial, async_param
from .. import Scraper, ScraperInput, Site
from ..utils import extract_emails_from_text, create_logger, extract_job_type
from ..utils import (
create_session,
)
from ...jobs import (
from jobspy.google.constant import headers_jobs, headers_initial, async_param
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
JobResponse,
Location,
JobType,
)
log = create_logger("Google")
from jobspy.util import extract_emails_from_text, extract_job_type, create_session
from jobspy.google.util import log, find_job_info_initial_page, find_job_info
class GoogleJobsScraper(Scraper):
class Google(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
@@ -135,7 +126,7 @@ class GoogleJobsScraper(Scraper):
pattern_fc = r'<div jsname="Yust4d"[^>]+data-async-fc="([^"]+)"'
match_fc = re.search(pattern_fc, response.text)
data_async_fc = match_fc.group(1) if match_fc else None
jobs_raw = self._find_job_info_initial_page(response.text)
jobs_raw = find_job_info_initial_page(response.text)
jobs = []
for job_raw in jobs_raw:
job_post = self._parse_job(job_raw)
@@ -167,7 +158,7 @@ class GoogleJobsScraper(Scraper):
continue
job_d = json.loads(job_data)
job_info = self._find_job_info(job_d)
job_info = find_job_info(job_d)
job_post = self._parse_job(job_info)
if job_post:
jobs_on_page.append(job_post)
@@ -209,39 +200,3 @@ class GoogleJobsScraper(Scraper):
job_type=extract_job_type(description),
)
return job_post
@staticmethod
def _find_job_info(jobs_data: list | dict) -> list | None:
"""Iterates through the JSON data to find the job listings"""
if isinstance(jobs_data, dict):
for key, value in jobs_data.items():
if key == "520084652" and isinstance(value, list):
return value
else:
result = GoogleJobsScraper._find_job_info(value)
if result:
return result
elif isinstance(jobs_data, list):
for item in jobs_data:
result = GoogleJobsScraper._find_job_info(item)
if result:
return result
return None
@staticmethod
def _find_job_info_initial_page(html_text: str):
pattern = f'520084652":(' + r"\[.*?\]\s*])\s*}\s*]\s*]\s*]\s*]\s*]"
results = []
matches = re.finditer(pattern, html_text)
import json
for match in matches:
try:
parsed_data = json.loads(match.group(1))
results.append(parsed_data)
except json.JSONDecodeError as e:
log.error(f"Failed to parse match: {str(e)}")
results.append({"raw_match": match.group(0), "error": str(e)})
return results

41
jobspy/google/util.py Normal file
View File

@@ -0,0 +1,41 @@
import re
from jobspy.util import create_logger
log = create_logger("Google")
def find_job_info(jobs_data: list | dict) -> list | None:
"""Iterates through the JSON data to find the job listings"""
if isinstance(jobs_data, dict):
for key, value in jobs_data.items():
if key == "520084652" and isinstance(value, list):
return value
else:
result = find_job_info(value)
if result:
return result
elif isinstance(jobs_data, list):
for item in jobs_data:
result = find_job_info(item)
if result:
return result
return None
def find_job_info_initial_page(html_text: str):
pattern = f'520084652":(' + r"\[.*?\]\s*])\s*}\s*]\s*]\s*]\s*]\s*]"
results = []
matches = re.finditer(pattern, html_text)
import json
for match in matches:
try:
parsed_data = json.loads(match.group(1))
results.append(parsed_data)
except json.JSONDecodeError as e:
log.error(f"Failed to parse match: {str(e)}")
results.append({"raw_match": match.group(0), "error": str(e)})
return results

View File

@@ -1,39 +1,32 @@
"""
jobspy.scrapers.indeed
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Indeed.
"""
from __future__ import annotations
import math
from typing import Tuple
from datetime import datetime
from typing import Tuple
from .constants import job_search_query, api_headers
from .. import Scraper, ScraperInput, Site
from ..utils import (
extract_emails_from_text,
get_enum_from_job_type,
markdown_converter,
create_session,
create_logger,
)
from ...jobs import (
from jobspy.indeed.constant import job_search_query, api_headers
from jobspy.indeed.util import is_job_remote, get_compensation, get_job_type
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
Compensation,
CompensationInterval,
Location,
JobResponse,
JobType,
DescriptionFormat,
)
from jobspy.util import (
extract_emails_from_text,
markdown_converter,
create_session,
create_logger,
)
log = create_logger("Indeed")
class IndeedScraper(Scraper):
class Indeed(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
@@ -212,8 +205,10 @@ class IndeedScraper(Scraper):
description = job["description"]["html"]
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description = markdown_converter(description)
description = description.replace(",", "")
job_type = self._get_job_type(job["attributes"])
job_type = get_job_type(job["attributes"])
timestamp_seconds = job["datePublished"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds).strftime("%Y-%m-%d")
employer = job["employer"].get("dossier") if job["employer"] else None
@@ -234,14 +229,14 @@ class IndeedScraper(Scraper):
country=job.get("location", {}).get("countryCode"),
),
job_type=job_type,
compensation=self._get_compensation(job["compensation"]),
compensation=get_compensation(job["compensation"]),
date_posted=date_posted,
job_url=job_url,
job_url_direct=(
job["recruit"].get("viewJobUrl") if job.get("recruit") else None
),
emails=extract_emails_from_text(description) if description else None,
is_remote=self._is_job_remote(job, description),
is_remote=is_job_remote(job, description),
company_addresses=(
employer_details["addresses"][0]
if employer_details.get("addresses")
@@ -265,86 +260,3 @@ class IndeedScraper(Scraper):
else None
),
)
@staticmethod
def _get_job_type(attributes: list) -> list[JobType]:
"""
Parses the attributes to get list of job types
:param attributes:
:return: list of JobType
"""
job_types: list[JobType] = []
for attribute in attributes:
job_type_str = attribute["label"].replace("-", "").replace(" ", "").lower()
job_type = get_enum_from_job_type(job_type_str)
if job_type:
job_types.append(job_type)
return job_types
@staticmethod
def _get_compensation(compensation: dict) -> Compensation | None:
"""
Parses the job to get compensation
:param job:
:return: compensation object
"""
if not compensation["baseSalary"] and not compensation["estimated"]:
return None
comp = (
compensation["baseSalary"]
if compensation["baseSalary"]
else compensation["estimated"]["baseSalary"]
)
if not comp:
return None
interval = IndeedScraper._get_compensation_interval(comp["unitOfWork"])
if not interval:
return None
min_range = comp["range"].get("min")
max_range = comp["range"].get("max")
return Compensation(
interval=interval,
min_amount=int(min_range) if min_range is not None else None,
max_amount=int(max_range) if max_range is not None else None,
currency=(
compensation["estimated"]["currencyCode"]
if compensation["estimated"]
else compensation["currencyCode"]
),
)
@staticmethod
def _is_job_remote(job: dict, description: str) -> bool:
"""
Searches the description, location, and attributes to check if job is remote
"""
remote_keywords = ["remote", "work from home", "wfh"]
is_remote_in_attributes = any(
any(keyword in attr["label"].lower() for keyword in remote_keywords)
for attr in job["attributes"]
)
is_remote_in_description = any(
keyword in description.lower() for keyword in remote_keywords
)
is_remote_in_location = any(
keyword in job["location"]["formatted"]["long"].lower()
for keyword in remote_keywords
)
return (
is_remote_in_attributes or is_remote_in_description or is_remote_in_location
)
@staticmethod
def _get_compensation_interval(interval: str) -> CompensationInterval:
interval_mapping = {
"DAY": "DAILY",
"YEAR": "YEARLY",
"HOUR": "HOURLY",
"WEEK": "WEEKLY",
"MONTH": "MONTHLY",
}
mapped_interval = interval_mapping.get(interval.upper(), None)
if mapped_interval and mapped_interval in CompensationInterval.__members__:
return CompensationInterval[mapped_interval]
else:
raise ValueError(f"Unsupported interval: {interval}")

80
jobspy/indeed/util.py Normal file
View File

@@ -0,0 +1,80 @@
from jobspy.model import CompensationInterval, JobType, Compensation
from jobspy.util import get_enum_from_job_type
def get_job_type(attributes: list) -> list[JobType]:
"""
Parses the attributes to get list of job types
:param attributes:
:return: list of JobType
"""
job_types: list[JobType] = []
for attribute in attributes:
job_type_str = attribute["label"].replace("-", "").replace(" ", "").lower()
job_type = get_enum_from_job_type(job_type_str)
if job_type:
job_types.append(job_type)
return job_types
def get_compensation(compensation: dict) -> Compensation | None:
"""
Parses the job to get compensation
:param sssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrompensation:
:return: compensation object
"""
if not compensation["baseSalary"] and not compensation["estimated"]:
return None
comp = (
compensation["baseSalary"]
if compensation["baseSalary"]
else compensation["estimated"]["baseSalary"]
)
if not comp:
return None
interval = get_compensation_interval(comp["unitOfWork"])
if not interval:
return None
min_range = comp["range"].get("min")
max_range = comp["range"].get("max")
return Compensation(
interval=interval,
min_amount=int(min_range) if min_range is not None else None,
max_amount=int(max_range) if max_range is not None else None,
currency=(
compensation["estimated"]["currencyCode"]
if compensation["estimated"]
else compensation["currencyCode"]
),
)
def is_job_remote(job: dict, description: str) -> bool:
"""
Searches the description, location, and attributes to check if job is remote
"""
remote_keywords = ["remote", "work from home", "wfh"]
is_remote_in_attributes = any(
any(keyword in attr["label"].lower() for keyword in remote_keywords)
for attr in job["attributes"]
)
is_remote_in_location = any(
keyword in job["location"]["formatted"]["long"].lower()
for keyword in remote_keywords
)
return is_remote_in_attributes or is_remote_in_location
def get_compensation_interval(interval: str) -> CompensationInterval:
interval_mapping = {
"DAY": "DAILY",
"YEAR": "YEARLY",
"HOUR": "HOURLY",
"WEEK": "WEEKLY",
"MONTH": "MONTHLY",
}
mapped_interval = interval_mapping.get(interval.upper(), None)
if mapped_interval and mapped_interval in CompensationInterval.__members__:
return CompensationInterval[mapped_interval]
else:
raise ValueError(f"Unsupported interval: {interval}")

View File

@@ -1,47 +1,48 @@
"""
jobspy.scrapers.linkedin
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape LinkedIn.
"""
from __future__ import annotations
import math
import time
import random
import regex as re
from typing import Optional
import time
from datetime import datetime
from bs4.element import Tag
from bs4 import BeautifulSoup
from typing import Optional
from urllib.parse import urlparse, urlunparse, unquote
from .constants import headers
from .. import Scraper, ScraperInput, Site
from ..exceptions import LinkedInException
from ..utils import create_session, remove_attributes, create_logger
from ...jobs import (
import regex as re
from bs4 import BeautifulSoup
from bs4.element import Tag
from jobspy.exception import LinkedInException
from jobspy.linkedin.constant import headers
from jobspy.linkedin.util import (
job_type_code,
parse_job_type,
parse_job_level,
parse_company_industry,
)
from jobspy.model import (
JobPost,
Location,
JobResponse,
JobType,
Country,
Compensation,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
from ..utils import (
from jobspy.util import (
extract_emails_from_text,
get_enum_from_job_type,
currency_parser,
markdown_converter,
create_session,
remove_attributes,
create_logger,
)
log = create_logger("LinkedIn")
class LinkedInScraper(Scraper):
class LinkedIn(Scraper):
base_url = "https://www.linkedin.com"
delay = 3
band_delay = 4
@@ -95,7 +96,7 @@ class LinkedInScraper(Scraper):
"distance": scraper_input.distance,
"f_WT": 2 if scraper_input.is_remote else None,
"f_JT": (
self.job_type_code(scraper_input.job_type)
job_type_code(scraper_input.job_type)
if scraper_input.job_type
else None
),
@@ -216,6 +217,8 @@ class LinkedInScraper(Scraper):
job_details = {}
if full_descr:
job_details = self._get_job_details(job_id)
description = description.replace(",", "")
return JobPost(
id=f"li-{job_id}",
@@ -282,9 +285,9 @@ class LinkedInScraper(Scraper):
)
return {
"description": description,
"job_level": self._parse_job_level(soup),
"company_industry": self._parse_company_industry(soup),
"job_type": self._parse_job_type(soup),
"job_level": parse_job_level(soup),
"company_industry": parse_company_industry(soup),
"job_type": parse_job_type(soup),
"job_url_direct": self._parse_job_url_direct(soup),
"company_logo": company_logo,
"job_function": job_function,
@@ -316,77 +319,6 @@ class LinkedInScraper(Scraper):
location = Location(city=city, state=state, country=country)
return location
@staticmethod
def _parse_job_type(soup_job_type: BeautifulSoup) -> list[JobType] | None:
"""
Gets the job type from job page
:param soup_job_type:
:return: JobType
"""
h3_tag = soup_job_type.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Employment type" in text,
)
employment_type = None
if h3_tag:
employment_type_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if employment_type_span:
employment_type = employment_type_span.get_text(strip=True)
employment_type = employment_type.lower()
employment_type = employment_type.replace("-", "")
return [get_enum_from_job_type(employment_type)] if employment_type else []
@staticmethod
def _parse_job_level(soup_job_level: BeautifulSoup) -> str | None:
"""
Gets the job level from job page
:param soup_job_level:
:return: str
"""
h3_tag = soup_job_level.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Seniority level" in text,
)
job_level = None
if h3_tag:
job_level_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if job_level_span:
job_level = job_level_span.get_text(strip=True)
return job_level
@staticmethod
def _parse_company_industry(soup_industry: BeautifulSoup) -> str | None:
"""
Gets the company industry from job page
:param soup_industry:
:return: str
"""
h3_tag = soup_industry.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Industries" in text,
)
industry = None
if h3_tag:
industry_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if industry_span:
industry = industry_span.get_text(strip=True)
return industry
def _parse_job_url_direct(self, soup: BeautifulSoup) -> str | None:
"""
Gets the job url direct from job page
@@ -403,13 +335,3 @@ class LinkedInScraper(Scraper):
job_url_direct = unquote(job_url_direct_match.group())
return job_url_direct
@staticmethod
def job_type_code(job_type_enum: JobType) -> str:
return {
JobType.FULL_TIME: "F",
JobType.PART_TIME: "P",
JobType.INTERNSHIP: "I",
JobType.CONTRACT: "C",
JobType.TEMPORARY: "T",
}.get(job_type_enum, "")

85
jobspy/linkedin/util.py Normal file
View File

@@ -0,0 +1,85 @@
from bs4 import BeautifulSoup
from jobspy.model import JobType
from jobspy.util import get_enum_from_job_type
def job_type_code(job_type_enum: JobType) -> str:
return {
JobType.FULL_TIME: "F",
JobType.PART_TIME: "P",
JobType.INTERNSHIP: "I",
JobType.CONTRACT: "C",
JobType.TEMPORARY: "T",
}.get(job_type_enum, "")
def parse_job_type(soup_job_type: BeautifulSoup) -> list[JobType] | None:
"""
Gets the job type from job page
:param soup_job_type:
:return: JobType
"""
h3_tag = soup_job_type.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Employment type" in text,
)
employment_type = None
if h3_tag:
employment_type_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if employment_type_span:
employment_type = employment_type_span.get_text(strip=True)
employment_type = employment_type.lower()
employment_type = employment_type.replace("-", "")
return [get_enum_from_job_type(employment_type)] if employment_type else []
def parse_job_level(soup_job_level: BeautifulSoup) -> str | None:
"""
Gets the job level from job page
:param soup_job_level:
:return: str
"""
h3_tag = soup_job_level.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Seniority level" in text,
)
job_level = None
if h3_tag:
job_level_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if job_level_span:
job_level = job_level_span.get_text(strip=True)
return job_level
def parse_company_industry(soup_industry: BeautifulSoup) -> str | None:
"""
Gets the company industry from job page
:param soup_industry:
:return: str
"""
h3_tag = soup_industry.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Industries" in text,
)
industry = None
if h3_tag:
industry_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if industry_span:
industry = industry_span.get_text(strip=True)
return industry

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional
from datetime import date
from enum import Enum
@@ -68,16 +69,20 @@ class Country(Enum):
AUSTRIA = ("austria", "at", "at")
BAHRAIN = ("bahrain", "bh")
BELGIUM = ("belgium", "be", "fr:be")
BULGARIA = ("bulgaria", "bg")
BRAZIL = ("brazil", "br", "com.br")
CANADA = ("canada", "ca", "ca")
CHILE = ("chile", "cl")
CHINA = ("china", "cn")
COLOMBIA = ("colombia", "co")
COSTARICA = ("costa rica", "cr")
CROATIA = ("croatia", "hr")
CYPRUS = ("cyprus", "cy")
CZECHREPUBLIC = ("czech republic,czechia", "cz")
DENMARK = ("denmark", "dk")
ECUADOR = ("ecuador", "ec")
EGYPT = ("egypt", "eg")
ESTONIA = ("estonia", "ee")
FINLAND = ("finland", "fi")
FRANCE = ("france", "fr", "fr")
GERMANY = ("germany", "de", "de")
@@ -91,6 +96,8 @@ class Country(Enum):
ITALY = ("italy", "it", "it")
JAPAN = ("japan", "jp")
KUWAIT = ("kuwait", "kw")
LATVIA = ("latvia", "lv")
LITHUANIA = ("lithuania", "lt")
LUXEMBOURG = ("luxembourg", "lu")
MALAYSIA = ("malaysia", "malaysia:my", "com")
MALTA = ("malta", "malta:mt", "mt")
@@ -111,6 +118,8 @@ class Country(Enum):
ROMANIA = ("romania", "ro")
SAUDIARABIA = ("saudi arabia", "sa")
SINGAPORE = ("singapore", "sg", "sg")
SLOVAKIA = ("slovakia", "sk")
SLOVENIA = ("slovenia", "sl")
SOUTHAFRICA = ("south africa", "za")
SOUTHKOREA = ("south korea", "kr")
SPAIN = ("spain", "es", "es")
@@ -265,3 +274,49 @@ class JobPost(BaseModel):
class JobResponse(BaseModel):
jobs: list[JobPost] = []
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
BAYT = "bayt"
class SalarySource(Enum):
DIRECT_DATA = "direct_data"
DESCRIPTION = "description"
class ScraperInput(BaseModel):
site_type: list[Site]
search_term: str | None = None
google_search_term: str | None = None
location: str | None = None
country: Country | None = Country.USA
distance: int | None = None
is_remote: bool = False
job_type: JobType | None = None
easy_apply: bool | None = None
offset: int = 0
linkedin_fetch_description: bool = False
linkedin_company_ids: list[int] | None = None
description_format: DescriptionFormat | None = DescriptionFormat.MARKDOWN
results_wanted: int = 15
hours_old: int | None = None
class Scraper(ABC):
def __init__(
self, site: Site, proxies: list[str] | None = None, ca_cert: str | None = None
):
self.site = site
self.proxies = proxies
self.ca_cert = ca_cert
@abstractmethod
def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...

View File

@@ -11,7 +11,7 @@ import urllib3
from markdownify import markdownify as md
from requests.adapters import HTTPAdapter, Retry
from ..jobs import CompensationInterval, JobType
from jobspy.model import CompensationInterval, JobType, Site
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@@ -286,3 +286,62 @@ def extract_job_type(description: str):
listing_types.append(key)
return listing_types if listing_types else None
def map_str_to_site(site_name: str) -> Site:
return Site[site_name.upper()]
def get_enum_from_value(value_str):
for job_type in JobType:
if value_str in job_type.value:
return job_type
raise Exception(f"Invalid job type: {value_str}")
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
desired_order = [
"id",
"site",
"job_url",
"job_url_direct",
"title",
"company",
"location",
"date_posted",
"job_type",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]

View File

@@ -1,10 +1,3 @@
"""
jobspy.scrapers.ziprecruiter
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape ZipRecruiter.
"""
from __future__ import annotations
import json
@@ -13,33 +6,34 @@ import re
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from typing import Optional, Tuple, Any
from bs4 import BeautifulSoup
from .constants import headers
from .. import Scraper, ScraperInput, Site
from ..utils import (
from jobspy.ziprecruiter.constant import headers, get_cookie_data
from jobspy.util import (
extract_emails_from_text,
create_session,
markdown_converter,
remove_attributes,
create_logger,
)
from ...jobs import (
from jobspy.model import (
JobPost,
Compensation,
Location,
JobResponse,
JobType,
Country,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
from jobspy.ziprecruiter.util import get_job_type_enum, add_params
log = create_logger("ZipRecruiter")
class ZipRecruiterScraper(Scraper):
class ZipRecruiter(Scraper):
base_url = "https://www.ziprecruiter.com"
api_url = "https://api.ziprecruiter.com"
@@ -90,7 +84,7 @@ class ZipRecruiterScraper(Scraper):
def _find_jobs_in_page(
self, scraper_input: ScraperInput, continue_token: str | None = None
) -> Tuple[list[JobPost], Optional[str]]:
) -> tuple[list[JobPost], str | None]:
"""
Scrapes a page of ZipRecruiter for jobs with scraper_input criteria
:param scraper_input:
@@ -98,7 +92,7 @@ class ZipRecruiterScraper(Scraper):
:return: jobs found on page
"""
jobs_list = []
params = self._add_params(scraper_input)
params = add_params(scraper_input)
if continue_token:
params["continue_from"] = continue_token
try:
@@ -151,7 +145,7 @@ class ZipRecruiterScraper(Scraper):
location = Location(
city=job.get("job_city"), state=job.get("job_state"), country=country_enum
)
job_type = self._get_job_type_enum(
job_type = get_job_type_enum(
job.get("employment_type", "").replace("_", "").lower()
)
date_posted = datetime.fromisoformat(job["posted_time"].rstrip("Z")).date()
@@ -200,13 +194,17 @@ class ZipRecruiterScraper(Scraper):
else ""
)
description_full = job_description_clean + company_description_clean
script_tag = soup.find("script", type="application/json")
if script_tag:
job_json = json.loads(script_tag.string)
job_url_val = job_json["model"].get("saveJobURL", "")
m = re.search(r"job_url=(.+)", job_url_val)
if m:
job_url_direct = m.group(1)
try:
script_tag = soup.find("script", type="application/json")
if script_tag:
job_json = json.loads(script_tag.string)
job_url_val = job_json["model"].get("saveJobURL", "")
m = re.search(r"job_url=(.+)", job_url_val)
if m:
job_url_direct = m.group(1)
except:
job_url_direct = None
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description_full = markdown_converter(description_full)
@@ -217,51 +215,5 @@ class ZipRecruiterScraper(Scraper):
"""
Sends a session event to the API with device properties.
"""
data = [
("event_type", "session"),
("logged_in", "false"),
("number_of_retry", "1"),
("property", "model:iPhone"),
("property", "os:iOS"),
("property", "locale:en_us"),
("property", "app_build_number:4734"),
("property", "app_version:91.0"),
("property", "manufacturer:Apple"),
("property", "timestamp:2025-01-12T12:04:42-06:00"),
("property", "screen_height:852"),
("property", "os_version:16.6.1"),
("property", "source:install"),
("property", "screen_width:393"),
("property", "device_model:iPhone 14 Pro"),
("property", "brand:Apple"),
]
url = f"{self.api_url}/jobs-app/event"
self.session.post(url, data=data)
@staticmethod
def _get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
return None
@staticmethod
def _add_params(scraper_input) -> dict[str, str | Any]:
params = {
"search": scraper_input.search_term,
"location": scraper_input.location,
}
if scraper_input.hours_old:
params["days"] = max(scraper_input.hours_old // 24, 1)
job_type_map = {JobType.FULL_TIME: "full_time", JobType.PART_TIME: "part_time"}
if scraper_input.job_type:
job_type = scraper_input.job_type
params["employment_type"] = job_type_map.get(job_type, job_type.value[0])
if scraper_input.easy_apply:
params["zipapply"] = 1
if scraper_input.is_remote:
params["remote"] = 1
if scraper_input.distance:
params["radius"] = scraper_input.distance
return {k: v for k, v in params.items() if v is not None}
self.session.post(url, data=get_cookie_data)

View File

@@ -0,0 +1,29 @@
headers = {
"Host": "api.ziprecruiter.com",
"accept": "*/*",
"x-zr-zva-override": "100000000;vid:ZT1huzm_EQlDTVEc",
"x-pushnotificationid": "0ff4983d38d7fc5b3370297f2bcffcf4b3321c418f5c22dd152a0264707602a0",
"x-deviceid": "D77B3A92-E589-46A4-8A39-6EF6F1D86006",
"user-agent": "Job Search/87.0 (iPhone; CPU iOS 16_6_1 like Mac OS X)",
"authorization": "Basic YTBlZjMyZDYtN2I0Yy00MWVkLWEyODMtYTI1NDAzMzI0YTcyOg==",
"accept-language": "en-US,en;q=0.9",
}
get_cookie_data = [
("event_type", "session"),
("logged_in", "false"),
("number_of_retry", "1"),
("property", "model:iPhone"),
("property", "os:iOS"),
("property", "locale:en_us"),
("property", "app_build_number:4734"),
("property", "app_version:91.0"),
("property", "manufacturer:Apple"),
("property", "timestamp:2025-01-12T12:04:42-06:00"),
("property", "screen_height:852"),
("property", "os_version:16.6.1"),
("property", "source:install"),
("property", "screen_width:393"),
("property", "device_model:iPhone 14 Pro"),
("property", "brand:Apple"),
]

View File

@@ -0,0 +1,31 @@
from jobspy.model import JobType
def add_params(scraper_input) -> dict[str, str | int]:
params: dict[str, str | int] = {
"search": scraper_input.search_term,
"location": scraper_input.location,
}
if scraper_input.hours_old:
params["days"] = max(scraper_input.hours_old // 24, 1)
job_type_map = {JobType.FULL_TIME: "full_time", JobType.PART_TIME: "part_time"}
if scraper_input.job_type:
job_type = scraper_input.job_type
params["employment_type"] = job_type_map.get(job_type, job_type.value[0])
if scraper_input.easy_apply:
params["zipapply"] = 1
if scraper_input.is_remote:
params["remote"] = 1
if scraper_input.distance:
params["radius"] = scraper_input.distance
return {k: v for k, v in params.items() if v is not None}
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
return None

1159
jobspy_output.csv Normal file

File diff suppressed because it is too large Load Diff

236
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -4,21 +4,20 @@ build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "python-jobspy"
version = "1.1.76"
description = "Job scraper for LinkedIn, Indeed, Glassdoor & ZipRecruiter"
authors = [ "Zachary Hampton <zachary@bunsly.com>", "Cullen Watson <cullen@bunsly.com>",]
homepage = "https://github.com/Bunsly/JobSpy"
version = "1.1.78"
description = "Job scraper for LinkedIn, Indeed, Glassdoor, ZipRecruiter & Bayt"
authors = ["Cullen Watson <cullen@cullenwatson.com>", "Zachary Hampton <zachary@zacharysproducts.com>"]
homepage = "https://github.com/cullenwatson/JobSpy"
readme = "README.md"
keywords = [ "jobs-scraper", "linkedin", "indeed", "glassdoor", "ziprecruiter",]
keywords = [ "jobs-scraper", "linkedin", "indeed", "glassdoor", "ziprecruiter", "bayt"]
[[tool.poetry.packages]]
include = "jobspy"
from = "src"
[tool.black]
line-length = 88
[tool.poetry.dependencies]
python = "^3.10"
python = "^3.10 || ^3.12"
requests = "^2.31.0"
beautifulsoup4 = "^4.12.2"
pandas = "^2.1.0"
@@ -29,7 +28,6 @@ markdownify = "^0.13.1"
regex = "^2024.4.28"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.1"
jupyter = "^1.0.0"
black = "*"
pre-commit = "*"

118
requirements.txt Normal file
View File

@@ -0,0 +1,118 @@
annotated-types==0.7.0
anyio==4.6.2.post1
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asttokens==2.4.1
async-lru==2.0.4
attrs==24.2.0
babel==2.16.0
beautifulsoup4==4.12.3
black==24.10.0
bleach==6.1.0
certifi==2024.8.30
cffi==1.17.1
cfgv==3.4.0
charset-normalizer==3.4.0
click==8.1.7
comm==0.2.2
debugpy==1.8.7
decorator==5.1.1
defusedxml==0.7.1
distlib==0.3.9
executing==2.1.0
fastjsonschema==2.20.0
filelock==3.16.1
fqdn==1.5.1
h11==0.14.0
httpcore==1.0.6
httpx==0.27.2
identify==2.6.1
idna==3.10
ipykernel==6.29.5
ipython==8.28.0
ipywidgets==8.1.5
isoduration==20.11.0
jedi==0.19.1
Jinja2==3.1.4
json5==0.9.25
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-specifications==2024.10.1
jupyter==1.1.1
jupyter-console==6.6.3
jupyter-events==0.10.0
jupyter-lsp==2.2.5
jupyter_client==8.6.3
jupyter_core==5.7.2
jupyter_server==2.14.2
jupyter_server_terminals==0.5.3
jupyterlab==4.2.5
jupyterlab_pygments==0.3.0
jupyterlab_server==2.27.3
jupyterlab_widgets==3.0.13
markdownify==0.13.1
MarkupSafe==3.0.2
matplotlib-inline==0.1.7
mistune==3.0.2
mypy-extensions==1.0.0
nbclient==0.10.0
nbconvert==7.16.4
nbformat==5.10.4
nest-asyncio==1.6.0
nodeenv==1.9.1
notebook==7.2.2
notebook_shim==0.2.4
numpy==1.26.3
overrides==7.7.0
packaging==24.1
pandas==2.2.3
pandocfilters==1.5.1
parso==0.8.4
pathspec==0.12.1
pexpect==4.9.0
platformdirs==4.3.6
pre_commit==4.0.1
prometheus_client==0.21.0
prompt_toolkit==3.0.48
psutil==6.1.0
ptyprocess==0.7.0
pure_eval==0.2.3
pycparser==2.22
pydantic==2.9.2
pydantic_core==2.23.4
Pygments==2.18.0
python-dateutil==2.9.0.post0
-e git+https://github.com/fakebranden/JobSpy@60819a8fcabbd3eaba7741b673023612dc3d3692#egg=python_jobspy
python-json-logger==2.0.7
pytz==2024.2
PyYAML==6.0.2
pyzmq==26.2.0
referencing==0.35.1
regex==2024.9.11
requests==2.32.3
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rpds-py==0.20.0
Send2Trash==1.8.3
setuptools==75.2.0
six==1.16.0
sniffio==1.3.1
soupsieve==2.6
stack-data==0.6.3
terminado==0.18.1
tinycss2==1.3.0
tls-client==1.0.1
tornado==6.4.1
traitlets==5.14.3
types-python-dateutil==2.9.0.20241003
typing_extensions==4.12.2
tzdata==2024.2
uri-template==1.3.0
urllib3==2.2.3
virtualenv==20.27.0
wcwidth==0.2.13
webcolors==24.8.0
webencodings==0.5.1
websocket-client==1.8.0
widgetsnbextension==4.0.13

View File

@@ -1,58 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from ..jobs import (
Enum,
BaseModel,
JobType,
JobResponse,
Country,
DescriptionFormat,
)
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
BAYT = "bayt"
class SalarySource(Enum):
DIRECT_DATA = "direct_data"
DESCRIPTION = "description"
class ScraperInput(BaseModel):
site_type: list[Site]
search_term: str | None = None
google_search_term: str | None = None
location: str | None = None
country: Country | None = Country.USA
distance: int | None = None
is_remote: bool = False
job_type: JobType | None = None
easy_apply: bool | None = None
offset: int = 0
linkedin_fetch_description: bool = False
linkedin_company_ids: list[int] | None = None
description_format: DescriptionFormat | None = DescriptionFormat.MARKDOWN
results_wanted: int = 15
hours_old: int | None = None
class Scraper(ABC):
def __init__(
self, site: Site, proxies: list[str] | None = None, ca_cert: str | None = None
):
self.site = site
self.proxies = proxies
self.ca_cert = ca_cert
@abstractmethod
def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...

View File

@@ -1,10 +0,0 @@
headers = {
"Host": "api.ziprecruiter.com",
"accept": "*/*",
"x-zr-zva-override": "100000000;vid:ZT1huzm_EQlDTVEc",
"x-pushnotificationid": "0ff4983d38d7fc5b3370297f2bcffcf4b3321c418f5c22dd152a0264707602a0",
"x-deviceid": "D77B3A92-E589-46A4-8A39-6EF6F1D86006",
"user-agent": "Job Search/87.0 (iPhone; CPU iOS 16_6_1 like Mac OS X)",
"authorization": "Basic YTBlZjMyZDYtN2I0Yy00MWVkLWEyODMtYTI1NDAzMzI0YTcyOg==",
"accept-language": "en-US,en;q=0.9",
}