Compare commits

...

77 Commits

Author SHA1 Message Date
fakebranden
77cc1f8550 update for artifact with run ID 2025-04-15 09:01:33 +00:00
fakebranden
84b4524c43 fix the create or modify output file in folder 2025-04-15 08:30:44 +00:00
fakebranden
e6ae23c76f update output csv in yml for correct format 2025-04-15 08:06:36 +00:00
fakebranden
0103e11234 add test file to outputs for visibility 2025-04-15 08:01:10 +00:00
fakebranden
697ae5c8c9 delete manual output file from testing 2025-04-15 07:49:44 +00:00
fakebranden
9e0674f7fc updated yml so jobspy scraper runs properly 2025-04-15 07:38:56 +00:00
fakebranden
bbdad3584e updates to capital letter in configs files 2025-04-15 07:34:20 +00:00
fakebranden
a045bb442a add configs folder 2025-04-15 06:51:22 +00:00
fakebranden
3eb4c122e7 Delete configs/config_branden_at_autoemployme_onmicrosoft_com.json 2025-04-15 02:26:08 -04:00
fakebranden
74877c5fd8 Delete configs/config_Branden_at_autoemployme_onmicrosoft_com.json 2025-04-15 02:26:00 -04:00
JobSpy Bot
0a475e312f 🔄 Updated config for Branden@autoemployme.onmicrosoft.com 2025-04-15 02:11:26 -04:00
JobSpy Bot
e0514d218e 🔄 Updated config for Branden@autoemployme.onmicrosoft.com 2025-04-15 01:25:35 -04:00
fakebranden
529aa8a1f4 fixed configs and outputs file paths add & modify 2025-04-15 02:13:24 +00:00
fakebranden
93a21941eb outputs folder added sample file 2025-04-15 01:54:37 +00:00
fakebranden
8f8b39c6e2 outputs and configs folder added 2025-04-15 01:52:03 +00:00
fakebranden
cdcd79edfe add configs folder 2025-04-15 00:46:30 +00:00
fakebranden
89a40dc3e3 updated py and yml dynamic 2025-04-14 23:39:28 +00:00
fakebranden
6a326b7dd4 dynamic yml and py update 2025-04-14 21:37:07 +00:00
fakebranden
0a5c5fa9b3 yml matches dynamic output 2025-04-14 21:26:28 +00:00
fakebranden
e22e4cc092 updated dynamic 2025-04-14 21:02:02 +00:00
fakebranden
0abe28fae4 further dynamic updates to scraper for output 2025-04-14 19:00:30 +00:00
fakebranden
31d0389dd8 updated dynamic workflow added 2025-04-14 18:30:34 +00:00
fakebranden
fb9ab3a315 dynamic jobscraper py and config file 2025-04-14 18:21:11 +00:00
fakebranden
c34eff610f updated criteria 2025-04-07 16:12:53 +00:00
fakebranden
e9160a0b4c adjusted scraper for better delimiter and comma only between records 2025-03-12 00:47:10 +00:00
fakebranden
cd916c7978 reverted ziprecruiter 2025-03-12 00:16:09 +00:00
fakebranden
25c084ca2c removed commas in fields 2025-03-12 00:03:02 +00:00
fakebranden
341deba465 updated job description no limit 2025-03-10 19:40:12 +00:00
fakebranden
5337b3ec7f new exact job scraper 2025-03-10 19:11:36 +00:00
fakebranden
0171ecc4a0 update search criteria format 2025-03-10 05:05:17 +00:00
fakebranden
e191405c8e change actions to read 2025-03-08 09:16:16 +00:00
fakebranden
a2d139cb96 removed schedule cron so power automate can trigger the workflow 2025-03-07 21:54:00 +00:00
fakebranden
9e41e6e9db fixed yml file 2025-03-07 21:26:09 +00:00
fakebranden
bb7d4c55ed updated yml from requirements.txt# 2025-03-07 21:23:16 +00:00
fakebranden
58cc1937bb added req. 2025-03-07 21:21:01 +00:00
fakebranden
60819a8fca Merge branch 'main' of https://github.com/fakebranden/JobSpy 2025-03-07 21:15:32 +00:00
fakebranden
1c59cd6738 git add requirements.txt
git commit -m "Added requirements.txt"
git push origin main
2025-03-07 20:55:22 +00:00
fakebranden
eed96e4c04 Create requirements.txt 2025-03-07 15:53:26 -05:00
fakebranden
83c64f4bca Update jobspy_scraper.yml 2025-03-07 15:43:59 -05:00
fakebranden
d8ad9da1c0 Update jobspy_scraper.yml 2025-03-07 15:39:12 -05:00
fakebranden
5f5738eaaa new yml 2025-03-07 19:18:44 +00:00
fakebranden
e1da326317 all funtionality 2025-03-07 18:57:14 +00:00
Cullen Watson
6782b9884e fix:workflow 2025-03-01 14:49:31 -06:00
Cullen Watson
94c74d60f2 enh:workflow manual run 2025-03-01 14:47:24 -06:00
Cullen Watson
5463e5a664 chore:version 2025-03-01 14:38:25 -06:00
arkhy
ed139e7e6b added missing EU countries and languages (#250)
Co-authored-by: Kate Arkhangelskaya <ekar559e@tu-dresden.de>
2025-03-01 14:30:08 -06:00
Cullen Watson
5bd199d0a5 Merge branch 'main' of https://github.com/Bunsly/JobSpy 2025-02-21 14:15:06 -06:00
Cullen Watson
4ec308a302 refactor:organize code 2025-02-21 14:14:55 -06:00
Cullen Watson
7cb0c518fc docs:readme 2025-02-21 12:53:59 -06:00
Cullen Watson
df70d4bc2e minor 2025-02-21 12:35:31 -06:00
Cullen Watson
3006063875 enh:remove log by default 2025-02-21 12:31:04 -06:00
Abdulrahman Hisham
1be009b8bc Adding Bayt.com Scraper to current codebase (#246) 2025-02-21 12:29:54 -06:00
Cullen Watson
81ed9b3ddf enh:remove log by default 2025-02-21 12:29:28 -06:00
Abdulrahman Al Muaitah
11a9e9a56a Fixed Bayt scraper integration 2025-02-21 20:10:02 +04:00
Abdulrahman Al Muaitah
c6ade14784 Added Bayt Scraper integration 2025-02-21 15:31:29 +04:00
Cullen Watson
13c74a0fed docs:readme 2025-02-09 13:42:18 -06:00
Cullen Watson
333e9e6760 docs:readme 2025-01-17 21:44:49 -06:00
github-actions
04032a0f91 Increment version 2024-12-04 22:55:06 +00:00
Cullen Watson
496896d0b5 enh:fix yml (#225) 2024-12-04 16:54:52 -06:00
Cullen Watson
87ba1ad1bf fix yml 2024-12-04 16:52:15 -06:00
Jason Geffner
4e7ac9a583 Fix Google job search (#223)
The previous regex did not capture all expected matches in the returned content
2024-12-04 16:45:59 -06:00
Cullen Watson
e44d13e1cf enh:auto update version 2024-12-04 16:29:38 -06:00
Cullen Watson
d52e366ef7 docs:readme 2024-11-26 15:51:26 -06:00
Cullen Watson
395ebf0017 docs:readme 2024-11-26 15:49:12 -06:00
Cullen Watson
63fddd9b7f docs:readme 2024-11-26 15:48:22 -06:00
Cullen Watson
58956868ae docs:readme 2024-11-26 15:47:10 -06:00
Cullen Watson
4fce836222 docs:readme 2024-10-28 03:53:59 -05:00
Cullen Watson
5ba25e7a7c docs:readme 2024-10-28 03:42:19 -05:00
Cullen Watson
f7cb3e9206 docs:readme 2024-10-28 03:36:21 -05:00
Cullen Watson
3ad3f121f7 docs:readme 2024-10-28 03:34:52 -05:00
Cullen Watson
ff3c782912 docs:readme 2024-10-25 18:12:08 -05:00
Cullen Watson
338d854b96 fix(google): search (#216) 2024-10-25 14:54:14 -05:00
Cullen Watson
811d4c40b4 chore:version 2024-10-24 15:28:25 -05:00
Cullen Watson
dba92d22c2 chore:version 2024-10-24 15:27:16 -05:00
Cullen Watson
10a3592a0f docs:file 2024-10-24 15:26:49 -05:00
Cullen Watson
b7905cc756 docs:file 2024-10-24 15:24:18 -05:00
Cullen Watson
6867d58829 docs:readme 2024-10-24 15:22:31 -05:00
44 changed files with 2825 additions and 878 deletions

View File

@@ -0,0 +1,49 @@
name: JobSpy Scraper Dynamic Workflow
on:
workflow_dispatch:
inputs:
user_email:
description: 'Email of user'
required: true
default: 'Branden@autoemployme.onmicrosoft.com'
permissions:
contents: read
id-token: write
jobs:
scrape_jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout Repo
uses: actions/checkout@v3
- name: Set Up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install Dependencies
run: |
pip install --upgrade pip
pip install -r requirements.txt
- name: Sanitize Email + Create Run ID
id: vars
run: |
safe_email=$(echo "${{ github.event.inputs.user_email }}" | sed 's/@/_at_/g; s/\./_/g')
run_id=$(date +%s)
echo "safe_email=$safe_email" >> $GITHUB_OUTPUT
echo "run_id=$run_id" >> $GITHUB_OUTPUT
- name: Run Job Scraper
run: |
python job_scraper_dynamic.py "${{ github.event.inputs.user_email }}" "${{ steps.vars.outputs.run_id }}"
- name: Upload Output Artifact
uses: actions/upload-artifact@v4
with:
name: jobspy_output_${{ steps.vars.outputs.safe_email }}_${{ steps.vars.outputs.run_id }}
path: outputs/jobspy_output_${{ steps.vars.outputs.safe_email }}_${{ steps.vars.outputs.run_id }}.csv

48
.github/workflows/jobspy_scraper.yml vendored Normal file
View File

@@ -0,0 +1,48 @@
name: JobSpy Scraper Workflow
on:
workflow_dispatch: # Allows manual trigger from GitHub or Power Automate
# Remove or comment out the schedule to prevent auto-runs
# schedule:
# - cron: '0 */6 * * *' # Runs every 6 hours (DISABLED)
permissions:
actions: read
contents: read
id-token: write
jobs:
scrape_jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.10'
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
- name: Run JobSpy Scraper
run: python job_scraper_exact_match.py
- name: Debug - Check if jobspy_output.csv exists
run: |
if [ ! -f jobspy_output.csv ]; then
echo "❌ ERROR: jobspy_output.csv not found!"
exit 1
else
echo "✅ jobspy_output.csv found, proceeding to upload..."
fi
- name: Upload JobSpy Output as Artifact
uses: actions/upload-artifact@v4 # Explicitly using latest version
with:
name: jobspy-results
path: jobspy_output.csv

View File

@@ -1,9 +1,13 @@
name: Publish Python 🐍 distributions 📦 to PyPI
on: push
name: Publish JobSpy to PyPi
on:
push:
branches:
- main
workflow_dispatch:
jobs:
build-n-publish:
name: Build and publish Python 🐍 distributions 📦 to PyPI
name: Build and publish JobSpy to PyPi
runs-on: ubuntu-latest
steps:
@@ -27,7 +31,7 @@ jobs:
build
- name: Publish distribution 📦 to PyPI
if: startsWith(github.ref, 'refs/tags')
if: startsWith(github.ref, 'refs/tags') || github.event_name == 'workflow_dispatch'
uses: pypa/gh-action-pypi-publish@release/v1
with:
password: ${{ secrets.PYPI_API_TOKEN }}

View File

@@ -1,22 +0,0 @@
name: Python Tests
on:
pull_request:
branches:
- main
jobs:
test:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- name: Set up Python
uses: actions/setup-python@v2
with:
python-version: '3.8'
- name: Install dependencies
run: |
pip install poetry
poetry install
- name: Run tests
run: poetry run pytest tests/test_all.py

174
README.md
View File

@@ -1,17 +1,12 @@
<img src="https://github.com/cullenwatson/JobSpy/assets/78247585/ae185b7e-e444-4712-8bb9-fa97f53e896b" width="400">
**JobSpy** is a simple, yet comprehensive, job scraping library.
**Not technical?** Try out the web scraping tool on our site at [usejobspy.com](https://usejobspy.com).
*Looking to build a data-focused software product?* **[Book a call](https://bunsly.com/)** *to
work with us.*
**JobSpy** is a job scraping library with the goal of aggregating all the jobs from popular job boards with one tool.
## Features
- Scrapes job postings from **LinkedIn**, **Indeed**, **Glassdoor**, **Google**, & **ZipRecruiter** simultaneously
- Aggregates the job postings in a Pandas DataFrame
- Proxies support
- Scrapes job postings from **LinkedIn**, **Indeed**, **Glassdoor**, **Google**, **ZipRecruiter**, & **Bayt** concurrently
- Aggregates the job postings in a dataframe
- Proxies support to bypass blocking
![jobspy](https://github.com/cullenwatson/JobSpy/assets/78247585/ec7ef355-05f6-4fd3-8161-a817e31c5c57)
@@ -30,16 +25,16 @@ import csv
from jobspy import scrape_jobs
jobs = scrape_jobs(
site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"],
site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google", "bayt"],
search_term="software engineer",
google_search_term="software engineer jobs near San Francisco, CA since yesterday",
location="San Francisco, CA",
results_wanted=20,
hours_old=72, # (only Linkedin/Indeed is hour specific, others round up to days old)
country_indeed='USA', # only needed for indeed / glassdoor
hours_old=72,
country_indeed='USA',
# linkedin_fetch_description=True # get more info such as full description, direct job url for linkedin (slower)
# linkedin_fetch_description=True # gets more info such as description, direct job url (slower)
# proxies=["208.195.175.46:65095", "208.195.175.45:65095", "localhost"],
)
print(f"Found {len(jobs)} jobs")
print(jobs.head())
@@ -63,10 +58,13 @@ zip_recruiter Software Developer TEKsystems Phoenix
```plaintext
Optional
├── site_name (list|str):
| linkedin, zip_recruiter, indeed, glassdoor
| (default is all four)
| linkedin, zip_recruiter, indeed, glassdoor, google, bayt
| (default is all)
├── search_term (str)
|
├── google_search_term (str)
| search term for google jobs. This is the only param for filtering google jobs.
├── location (str)
@@ -86,7 +84,7 @@ Optional
| number of job results to retrieve for each site specified in 'site_name'
├── easy_apply (bool):
| filters for jobs that are hosted on the job board site
| filters for jobs that are hosted on the job board site (LinkedIn easy apply filter no longer works)
├── description_format (str):
| markdown, html (Format type of the job descriptions. Default is markdown.)
@@ -131,6 +129,84 @@ Optional
| - easy_apply
```
## Supported Countries for Job Searching
### **LinkedIn**
LinkedIn searches globally & uses only the `location` parameter.
### **ZipRecruiter**
ZipRecruiter searches for jobs in **US/Canada** & uses only the `location` parameter.
### **Indeed / Glassdoor**
Indeed & Glassdoor supports most countries, but the `country_indeed` parameter is required. Additionally, use the `location`
parameter to narrow down the location, e.g. city & state if necessary.
You can specify the following countries when searching on Indeed (use the exact name, * indicates support for Glassdoor):
| | | | |
|----------------------|--------------|------------|----------------|
| Argentina | Australia* | Austria* | Bahrain |
| Belgium* | Brazil* | Canada* | Chile |
| China | Colombia | Costa Rica | Czech Republic |
| Denmark | Ecuador | Egypt | Finland |
| France* | Germany* | Greece | Hong Kong* |
| Hungary | India* | Indonesia | Ireland* |
| Israel | Italy* | Japan | Kuwait |
| Luxembourg | Malaysia | Mexico* | Morocco |
| Netherlands* | New Zealand* | Nigeria | Norway |
| Oman | Pakistan | Panama | Peru |
| Philippines | Poland | Portugal | Qatar |
| Romania | Saudi Arabia | Singapore* | South Africa |
| South Korea | Spain* | Sweden | Switzerland* |
| Taiwan | Thailand | Turkey | Ukraine |
| United Arab Emirates | UK* | USA* | Uruguay |
| Venezuela | Vietnam* | | |
### **Bayt**
Bayt only uses the search_term parameter currently and searches internationally
## Notes
* Indeed is the best scraper currently with no rate limiting.
* All the job board endpoints are capped at around 1000 jobs on a given search.
* LinkedIn is the most restrictive and usually rate limits around the 10th page with one ip. Proxies are a must basically.
## Frequently Asked Questions
---
**Q: Why is Indeed giving unrelated roles?**
**A:** Indeed searches the description too.
- use - to remove words
- "" for exact match
Example of a good Indeed query
```py
search_term='"engineering intern" software summer (java OR python OR c++) 2025 -tax -marketing'
```
This searches the description/title and must include software, summer, 2025, one of the languages, engineering intern exactly, no tax, no marketing.
---
**Q: No results when using "google"?**
**A:** You have to use super specific syntax. Search for google jobs on your browser and then whatever pops up in the google jobs search box after applying some filters is what you need to copy & paste into the google_search_term.
---
**Q: Received a response code 429?**
**A:** This indicates that you have been blocked by the job board site for sending too many requests. All of the job board sites are aggressive with blocking. We recommend:
- Wait some time between scrapes (site-dependent).
- Try using the proxies param to change your IP address.
---
### JobPost Schema
@@ -170,67 +246,3 @@ Indeed specific
├── company_description
└── company_logo
```
## Supported Countries for Job Searching
### **LinkedIn**
LinkedIn searches globally & uses only the `location` parameter.
### **ZipRecruiter**
ZipRecruiter searches for jobs in **US/Canada** & uses only the `location` parameter.
### **Indeed / Glassdoor**
Indeed & Glassdoor supports most countries, but the `country_indeed` parameter is required. Additionally, use the `location`
parameter to narrow down the location, e.g. city & state if necessary.
You can specify the following countries when searching on Indeed (use the exact name, * indicates support for Glassdoor):
| | | | |
|----------------------|--------------|------------|----------------|
| Argentina | Australia* | Austria* | Bahrain |
| Belgium* | Brazil* | Canada* | Chile |
| China | Colombia | Costa Rica | Czech Republic |
| Denmark | Ecuador | Egypt | Finland |
| France* | Germany* | Greece | Hong Kong* |
| Hungary | India* | Indonesia | Ireland* |
| Israel | Italy* | Japan | Kuwait |
| Luxembourg | Malaysia | Mexico* | Morocco |
| Netherlands* | New Zealand* | Nigeria | Norway |
| Oman | Pakistan | Panama | Peru |
| Philippines | Poland | Portugal | Qatar |
| Romania | Saudi Arabia | Singapore* | South Africa |
| South Korea | Spain* | Sweden | Switzerland* |
| Taiwan | Thailand | Turkey | Ukraine |
| United Arab Emirates | UK* | USA* | Uruguay |
| Venezuela | Vietnam* | | |
## Notes
* Indeed is the best scraper currently with no rate limiting.
* All the job board endpoints are capped at around 1000 jobs on a given search.
* LinkedIn is the most restrictive and usually rate limits around the 10th page with one ip. Proxies are a must basically.
## Frequently Asked Questions
---
**Q: Why is Indeed giving unrelated roles?**
**A:** Indeed is searching each one of your terms e.g. software intern, it searches software OR intern. Try search_term='"software intern"' in quotes for stricter searching
---
**Q: Received a response code 429?**
**A:** This indicates that you have been blocked by the job board site for sending too many requests. All of the job board sites are aggressive with blocking. We recommend:
- Wait some time between scrapes (site-dependent).
- Try using the proxies param to change your IP address.
---
**Q: Encountering issues with your queries?**
**A:** Try reducing the number of `results_wanted` and/or broadening the filters. If problems
persist, [submit an issue](https://github.com/Bunsly/JobSpy/issues).
---

8
configs/config.json Normal file
View File

@@ -0,0 +1,8 @@
{
"search_terms": ["IT Support", "Help Desk"],
"results_wanted": 50,
"max_days_old": 7,
"target_state": "NY",
"user_email": "Branden@autoemployme.onmicrosoft.com"
}

View File

@@ -0,0 +1,8 @@
{
"search_terms": ["Testing", "Help Desk", "Support"],
"results_wanted": 50,
"max_days_old": 7,
"target_state": "NY",
"user_email": "Branden@autoemployme.onmicrosoft.com"
}

116
job_scraper.py Normal file
View File

@@ -0,0 +1,116 @@
import csv
import datetime
from jobspy.google import Google
from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed
from jobspy.ziprecruiter import ZipRecruiter
from jobspy.model import ScraperInput
# Define job sources
sources = {
"google": Google,
"linkedin": LinkedIn,
"indeed": Indeed,
"zip_recruiter": ZipRecruiter,
}
# Define search preferences
search_terms = ["Automation Engineer", "CRM Manager", "Implementation Specialist"]
results_wanted = 200 # Fetch more jobs
max_days_old = 2 # Fetch jobs posted in last 48 hours
target_state = "NY" # Only keep jobs from New York
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
"""Scrape jobs from multiple sources and filter by state."""
all_jobs = []
today = datetime.date.today()
print("\n🔎 DEBUG: Fetching jobs for search terms:", search_terms)
for search_term in search_terms:
for source_name, source_class in sources.items():
print(f"\n🚀 Scraping {search_term} from {source_name}...")
scraper = source_class()
search_criteria = ScraperInput(
site_type=[source_name],
search_term=search_term,
results_wanted=results_wanted,
)
job_response = scraper.scrape(search_criteria)
for job in job_response.jobs:
# Normalize location fields
location_city = job.location.city.strip() if job.location.city else "Unknown"
location_state = job.location.state.strip().upper() if job.location.state else "Unknown"
location_country = str(job.location.country) if job.location.country else "Unknown"
# Debug: Show all jobs being fetched
print(f"📍 Fetched Job: {job.title} - {location_city}, {location_state}, {location_country}")
# Ensure the job is recent
if job.date_posted and (today - job.date_posted).days <= max_days_old:
if location_state == target_state or job.is_remote:
print(f"✅ MATCH (In NY or Remote): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
all_jobs.append({
"Job ID": job.id,
"Job Title (Primary)": job.title,
"Company Name": job.company_name if job.company_name else "Unknown",
"Industry": job.company_industry if job.company_industry else "Not Provided",
"Experience Level": job.job_level if job.job_level else "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Is Remote": job.is_remote,
"Currency": job.compensation.currency if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "",
"Date Posted": job.date_posted.strftime("%Y-%m-%d") if job.date_posted else "Not Provided",
"Location City": location_city,
"Location State": location_state,
"Location Country": location_country,
"Job URL": job.job_url,
"Job Description": job.description[:500] if job.description else "No description available",
"Job Source": source_name
})
else:
print(f"❌ Ignored (Wrong State): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
else:
print(f"⏳ Ignored (Too Old): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
print(f"\n{len(all_jobs)} jobs retrieved in NY")
return all_jobs
def save_jobs_to_csv(jobs, filename="jobspy_output.csv"):
"""Save job data to a CSV file."""
if not jobs:
print("⚠️ No jobs found matching criteria.")
return
fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description",
"Job Source"
]
with open(filename, mode="w", newline="", encoding="utf-8") as file:
writer = csv.DictWriter(file, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(jobs)
print(f"✅ Jobs saved to {filename} ({len(jobs)} entries)")
# Run the scraper with multiple job searches
job_data = scrape_jobs(
search_terms=search_terms,
results_wanted=results_wanted,
max_days_old=max_days_old,
target_state=target_state
)
# Save results to CSV
save_jobs_to_csv(job_data)

94
job_scraper_dynamic.py Normal file
View File

@@ -0,0 +1,94 @@
import csv, datetime, os, sys, json
from jobspy.google import Google
from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed
from jobspy.model import ScraperInput
sources = {
"google": Google,
"linkedin": LinkedIn,
"indeed": Indeed,
}
def sanitize_email(email):
return email.replace("@", "_at_").replace(".", "_")
def load_config(email):
safe_email = sanitize_email(email)
config_path = os.path.join("configs", f"config_{safe_email}.json")
if not os.path.exists(config_path):
raise FileNotFoundError(f"❌ Config for {email} not found at {config_path}")
with open(config_path, "r", encoding="utf-8") as f:
return json.load(f), safe_email
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
today = datetime.date.today()
all_jobs = []
for term in search_terms:
for source, Scraper in sources.items():
print(f"🔍 Scraping {term} from {source}")
scraper = Scraper()
try:
jobs = scraper.scrape(ScraperInput(
site_type=[source],
search_term=term,
results_wanted=results_wanted
)).jobs
except Exception as e:
print(f"⚠️ {source} error: {e}")
continue
for job in jobs:
if job.date_posted and (today - job.date_posted).days <= max_days_old:
if target_state == (job.location.state or "").upper() or job.is_remote:
if any(term.lower() in job.title.lower() for term in search_terms):
all_jobs.append({
"Job ID": job.id,
"Job Title (Primary)": job.title,
"Company Name": job.company_name or "Unknown",
"Industry": job.company_industry or "Not Provided",
"Experience Level": job.job_level or "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Is Remote": job.is_remote,
"Currency": job.compensation.currency if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "",
"Date Posted": job.date_posted.strftime("%Y-%m-%d"),
"Location City": job.location.city or "Unknown",
"Location State": (job.location.state or "Unknown").upper(),
"Location Country": job.location.country or "Unknown",
"Job URL": job.job_url,
"Job Description": job.description.replace(",", "") if job.description else "No description",
"Job Source": source
})
print(f"✅ Found {len(all_jobs)} jobs")
return all_jobs
def save_to_csv(jobs, path):
os.makedirs(os.path.dirname(path), exist_ok=True)
fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description", "Job Source"
]
header = "|~|".join(fieldnames)
rows = [header] + ["|~|".join(str(job.get(col, "Not Provided")).replace(",", "").strip() for col in fieldnames) for job in jobs]
with open(path, "w", encoding="utf-8") as f:
f.write(",".join(rows))
print(f"💾 Saved output to: {path}")
if __name__ == "__main__":
try:
if len(sys.argv) != 3:
raise ValueError("❌ Usage: python job_scraper_dynamic.py <user_email> <run_id>")
user_email, run_id = sys.argv[1], sys.argv[2]
config, safe_email = load_config(user_email)
jobs = scrape_jobs(config["search_terms"], config["results_wanted"], config["max_days_old"], config["target_state"])
save_to_csv(jobs, f"outputs/jobspy_output_{safe_email}_{run_id}.csv")
except Exception as e:
print(f"❌ Fatal error: {e}")
sys.exit(1)

146
job_scraper_exact_match.py Normal file
View File

@@ -0,0 +1,146 @@
import csv
import datetime
import os
from jobspy.google import Google
from jobspy.linkedin import LinkedIn
from jobspy.indeed import Indeed
from jobspy.model import ScraperInput
# Define job sources
sources = {
"google": Google,
"linkedin": LinkedIn,
"indeed": Indeed,
}
# Define search preferences
search_terms = ["Automation Engineer", "CRM Manager", "Implementation Specialist", "CRM", "Project Manager", "POS", "Microsoft Power", "IT Support"]
results_wanted = 100 # Fetch more jobs
max_days_old = 2 # Fetch jobs posted in last 48 hours
target_state = "NY" # Only keep jobs from New York
def scrape_jobs(search_terms, results_wanted, max_days_old, target_state):
"""Scrape jobs from multiple sources and filter by state."""
all_jobs = []
today = datetime.date.today()
print("\n🔎 DEBUG: Fetching jobs for search terms:", search_terms)
for search_term in search_terms:
for source_name, source_class in sources.items():
print(f"\n🚀 Scraping {search_term} from {source_name}...")
scraper = source_class()
search_criteria = ScraperInput(
site_type=[source_name],
search_term=search_term,
results_wanted=results_wanted,
)
job_response = scraper.scrape(search_criteria)
for job in job_response.jobs:
# Normalize location fields
location_city = job.location.city.strip() if job.location.city else "Unknown"
location_state = job.location.state.strip().upper() if job.location.state else "Unknown"
location_country = str(job.location.country) if job.location.country else "Unknown"
# Debug: Show all jobs being fetched
print(f"📍 Fetched Job: {job.title} - {location_city}, {location_state}, {location_country}")
# Exclude jobs that dont explicitly match the search terms
if not any(term.lower() in job.title.lower() for term in search_terms):
print(f"🚫 Excluding: {job.title} (Doesn't match {search_terms})")
continue # Skip this job
# Ensure the job is recent
if job.date_posted and (today - job.date_posted).days <= max_days_old:
# Only accept jobs if they're in NY or Remote
if location_state == target_state or job.is_remote:
print(f"✅ MATCH: {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
all_jobs.append({
"Job ID": job.id,
"Job Title (Primary)": job.title,
"Company Name": job.company_name if job.company_name else "Unknown",
"Industry": job.company_industry if job.company_industry else "Not Provided",
"Experience Level": job.job_level if job.job_level else "Not Provided",
"Job Type": job.job_type[0].name if job.job_type else "Not Provided",
"Is Remote": job.is_remote,
"Currency": job.compensation.currency if job.compensation else "",
"Salary Min": job.compensation.min_amount if job.compensation else "",
"Salary Max": job.compensation.max_amount if job.compensation else "",
"Date Posted": job.date_posted.strftime("%Y-%m-%d") if job.date_posted else "Not Provided",
"Location City": location_city,
"Location State": location_state,
"Location Country": location_country,
"Job URL": job.job_url,
"Job Description": job.description.replace(",", "") if job.description else "No description available",
"Job Source": source_name
})
else:
print(f"❌ Ignored (Wrong State): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
else:
print(f"⏳ Ignored (Too Old): {job.title} - {location_city}, {location_state} (Posted {job.date_posted})")
print(f"\n{len(all_jobs)} jobs retrieved in NY")
return all_jobs
def save_jobs_to_csv(jobs, filename="jobspy_output.csv"):
"""Save job data to a CSV file with custom formatting:
- Fields within a record are separated by the custom delimiter |~|
- Records are separated by a comma
- All commas in field values are removed
- Blank fields are replaced with 'Not Provided'
"""
if not jobs:
print("⚠️ No jobs found matching criteria.")
return
# Remove old CSV file before writing
if os.path.exists(filename):
os.remove(filename)
fieldnames = [
"Job ID", "Job Title (Primary)", "Company Name", "Industry",
"Experience Level", "Job Type", "Is Remote", "Currency",
"Salary Min", "Salary Max", "Date Posted", "Location City",
"Location State", "Location Country", "Job URL", "Job Description",
"Job Source"
]
# Build header record using custom field delimiter
header_record = "|~|".join(fieldnames)
records = [header_record]
for job in jobs:
row = []
for field in fieldnames:
value = str(job.get(field, "")).strip()
if not value:
value = "Not Provided"
# Remove all commas from the value
value = value.replace(",", "")
row.append(value)
# Join fields with the custom delimiter
record = "|~|".join(row)
records.append(record)
# Join records with a comma as the record separator
output = ",".join(records)
with open(filename, "w", encoding="utf-8") as file:
file.write(output)
print(f"✅ Jobs saved to {filename} ({len(jobs)} entries)")
# Run the scraper with multiple job searches
job_data = scrape_jobs(
search_terms=search_terms,
results_wanted=results_wanted,
max_days_old=max_days_old,
target_state=target_state
)
# Save results to CSV with custom formatting
save_jobs_to_csv(job_data)

View File

@@ -1,29 +1,33 @@
from __future__ import annotations
import pandas as pd
from typing import Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Tuple
from .jobs import JobType, Location
from .scrapers.utils import set_logger_level, extract_salary, create_logger
from .scrapers.indeed import IndeedScraper
from .scrapers.ziprecruiter import ZipRecruiterScraper
from .scrapers.glassdoor import GlassdoorScraper
from .scrapers.google import GoogleJobsScraper
from .scrapers.linkedin import LinkedInScraper
from .scrapers import SalarySource, ScraperInput, Site, JobResponse, Country
from .scrapers.exceptions import (
LinkedInException,
IndeedException,
ZipRecruiterException,
GlassdoorException,
GoogleJobsException,
import pandas as pd
from jobspy.bayt import BaytScraper
from jobspy.glassdoor import Glassdoor
from jobspy.google import Google
from jobspy.indeed import Indeed
from jobspy.linkedin import LinkedIn
from jobspy.model import JobType, Location, JobResponse, Country
from jobspy.model import SalarySource, ScraperInput, Site
from jobspy.util import (
set_logger_level,
extract_salary,
create_logger,
get_enum_from_value,
map_str_to_site,
convert_to_annual,
desired_order,
)
from jobspy.ziprecruiter import ZipRecruiter
def scrape_jobs(
site_name: str | list[str] | Site | list[Site] | None = None,
search_term: str | None = None,
google_search_term: str | None = None,
location: str | None = None,
distance: int | None = 50,
is_remote: bool = False,
@@ -31,7 +35,6 @@ def scrape_jobs(
easy_apply: bool | None = None,
results_wanted: int = 15,
country_indeed: str = "usa",
hyperlinks: bool = False,
proxies: list[str] | str | None = None,
ca_cert: str | None = None,
description_format: str = "markdown",
@@ -40,31 +43,22 @@ def scrape_jobs(
offset: int | None = 0,
hours_old: int = None,
enforce_annual_salary: bool = False,
verbose: int = 2,
verbose: int = 0,
**kwargs,
) -> pd.DataFrame:
"""
Simultaneously scrapes job data from multiple job sites.
:return: pandas dataframe containing job data
Scrapes job data from job boards concurrently
:return: Pandas DataFrame containing job data
"""
SCRAPER_MAPPING = {
Site.LINKEDIN: LinkedInScraper,
Site.INDEED: IndeedScraper,
Site.ZIP_RECRUITER: ZipRecruiterScraper,
Site.GLASSDOOR: GlassdoorScraper,
Site.GOOGLE: GoogleJobsScraper,
Site.LINKEDIN: LinkedIn,
Site.INDEED: Indeed,
Site.ZIP_RECRUITER: ZipRecruiter,
Site.GLASSDOOR: Glassdoor,
Site.GOOGLE: Google,
Site.BAYT: BaytScraper,
}
set_logger_level(verbose)
def map_str_to_site(site_name: str) -> Site:
return Site[site_name.upper()]
def get_enum_from_value(value_str):
for job_type in JobType:
if value_str in job_type.value:
return job_type
raise Exception(f"Invalid job type: {value_str}")
job_type = get_enum_from_value(job_type) if job_type else None
def get_site_type():
@@ -86,6 +80,7 @@ def scrape_jobs(
site_type=get_site_type(),
country=country_enum,
search_term=search_term,
google_search_term=google_search_term,
location=location,
distance=distance,
is_remote=is_remote,
@@ -123,28 +118,12 @@ def scrape_jobs(
site_value, scraped_data = future.result()
site_to_jobs_dict[site_value] = scraped_data
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
jobs_dfs: list[pd.DataFrame] = []
for site, job_response in site_to_jobs_dict.items():
for job in job_response.jobs:
job_data = job.dict()
job_url = job_data["job_url"]
job_data["job_url_hyper"] = f'<a href="{job_url}">{job_url}</a>'
job_data["site"] = site
job_data["company"] = job_data["company_name"]
job_data["job_type"] = (
@@ -207,38 +186,6 @@ def scrape_jobs(
# Step 2: Concatenate the filtered DataFrames
jobs_df = pd.concat(filtered_dfs, ignore_index=True)
# Desired column order
desired_order = [
"id",
"site",
"job_url_hyper" if hyperlinks else "job_url",
"job_url_direct",
"title",
"company",
"location",
"job_type",
"date_posted",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]
# Step 3: Ensure all desired columns are present, adding missing ones as empty
for column in desired_order:
if column not in jobs_df.columns:
@@ -248,6 +195,8 @@ def scrape_jobs(
jobs_df = jobs_df[desired_order]
# Step 4: Sort the DataFrame as required
return jobs_df.sort_values(by=["site", "date_posted"], ascending=[True, False])
return jobs_df.sort_values(
by=["site", "date_posted"], ascending=[True, False]
).reset_index(drop=True)
else:
return pd.DataFrame()

145
jobspy/bayt/__init__.py Normal file
View File

@@ -0,0 +1,145 @@
from __future__ import annotations
import random
import time
from bs4 import BeautifulSoup
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
JobResponse,
Location,
Country,
)
from jobspy.util import create_logger, create_session
log = create_logger("Bayt")
class BaytScraper(Scraper):
base_url = "https://www.bayt.com"
delay = 2
band_delay = 3
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
super().__init__(Site.BAYT, proxies=proxies, ca_cert=ca_cert)
self.scraper_input = None
self.session = None
self.country = "worldwide"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
self.scraper_input = scraper_input
self.session = create_session(
proxies=self.proxies, ca_cert=self.ca_cert, is_tls=False, has_retry=True
)
job_list: list[JobPost] = []
page = 1
results_wanted = (
scraper_input.results_wanted if scraper_input.results_wanted else 10
)
while len(job_list) < results_wanted:
log.info(f"Fetching Bayt jobs page {page}")
job_elements = self._fetch_jobs(self.scraper_input.search_term, page)
if not job_elements:
break
if job_elements:
log.debug(
"First job element snippet:\n" + job_elements[0].prettify()[:500]
)
initial_count = len(job_list)
for job in job_elements:
try:
job_post = self._extract_job_info(job)
if job_post:
job_list.append(job_post)
if len(job_list) >= results_wanted:
break
else:
log.debug(
"Extraction returned None. Job snippet:\n"
+ job.prettify()[:500]
)
except Exception as e:
log.error(f"Bayt: Error extracting job info: {str(e)}")
continue
if len(job_list) == initial_count:
log.info(f"No new jobs found on page {page}. Ending pagination.")
break
page += 1
time.sleep(random.uniform(self.delay, self.delay + self.band_delay))
job_list = job_list[: scraper_input.results_wanted]
return JobResponse(jobs=job_list)
def _fetch_jobs(self, query: str, page: int) -> list | None:
"""
Grabs the job results for the given query and page number.
"""
try:
url = f"{self.base_url}/en/international/jobs/{query}-jobs/?page={page}"
response = self.session.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
job_listings = soup.find_all("li", attrs={"data-js-job": ""})
log.debug(f"Found {len(job_listings)} job listing elements")
return job_listings
except Exception as e:
log.error(f"Bayt: Error fetching jobs - {str(e)}")
return None
def _extract_job_info(self, job: BeautifulSoup) -> JobPost | None:
"""
Extracts the job information from a single job listing.
"""
# Find the h2 element holding the title and link (no class filtering)
job_general_information = job.find("h2")
if not job_general_information:
return
job_title = job_general_information.get_text(strip=True)
job_url = self._extract_job_url(job_general_information)
if not job_url:
return
# Extract company name using the original approach:
company_tag = job.find("div", class_="t-nowrap p10l")
company_name = (
company_tag.find("span").get_text(strip=True)
if company_tag and company_tag.find("span")
else None
)
# Extract location using the original approach:
location_tag = job.find("div", class_="t-mute t-small")
location = location_tag.get_text(strip=True) if location_tag else None
job_id = f"bayt-{abs(hash(job_url))}"
location_obj = Location(
city=location,
country=Country.from_string(self.country),
)
return JobPost(
id=job_id,
title=job_title,
company_name=company_name,
location=location_obj,
job_url=job_url,
)
def _extract_job_url(self, job_general_information: BeautifulSoup) -> str | None:
"""
Pulls the job URL from the 'a' within the h2 element.
"""
a_tag = job_general_information.find("a")
if a_tag and a_tag.has_attr("href"):
return self.base_url + a_tag["href"].strip()

View File

@@ -1,5 +1,5 @@
"""
jobspy.scrapers.exceptions
jobspy.jobboard.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of Scrapers' exceptions.
@@ -29,3 +29,8 @@ class GlassdoorException(Exception):
class GoogleJobsException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with Google Jobs")
class BaytException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with Bayt")

View File

@@ -1,41 +1,38 @@
"""
jobspy.scrapers.glassdoor
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Glassdoor.
"""
from __future__ import annotations
import re
import json
import requests
from typing import Optional, Tuple
from typing import Tuple
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from .constants import fallback_token, query_template, headers
from .. import Scraper, ScraperInput, Site
from ..utils import extract_emails_from_text, create_logger
from ..exceptions import GlassdoorException
from ..utils import (
from jobspy.glassdoor.constant import fallback_token, query_template, headers
from jobspy.glassdoor.util import (
get_cursor_for_page,
parse_compensation,
parse_location,
)
from jobspy.util import (
extract_emails_from_text,
create_logger,
create_session,
markdown_converter,
)
from ...jobs import (
from jobspy.exception import GlassdoorException
from jobspy.model import (
JobPost,
Compensation,
CompensationInterval,
Location,
JobResponse,
JobType,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
logger = create_logger("Glassdoor")
log = create_logger("Glassdoor")
class GlassdoorScraper(Scraper):
class Glassdoor(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
@@ -64,7 +61,7 @@ class GlassdoorScraper(Scraper):
self.base_url = self.scraper_input.country.get_glassdoor_url()
self.session = create_session(
proxies=self.proxies, ca_cert=self.ca_cert, is_tls=True, has_retry=True
proxies=self.proxies, ca_cert=self.ca_cert, has_retry=True
)
token = self._get_csrf_token()
headers["gd-csrf-token"] = token if token else fallback_token
@@ -74,7 +71,7 @@ class GlassdoorScraper(Scraper):
scraper_input.location, scraper_input.is_remote
)
if location_type is None:
logger.error("Glassdoor: location not parsed")
log.error("Glassdoor: location not parsed")
return JobResponse(jobs=[])
job_list: list[JobPost] = []
cursor = None
@@ -83,7 +80,7 @@ class GlassdoorScraper(Scraper):
tot_pages = (scraper_input.results_wanted // self.jobs_per_page) + 2
range_end = min(tot_pages, self.max_pages + 1)
for page in range(range_start, range_end):
logger.info(f"search page: {page} / {range_end-1}")
log.info(f"search page: {page} / {range_end - 1}")
try:
jobs, cursor = self._fetch_jobs_page(
scraper_input, location_id, location_type, page, cursor
@@ -93,7 +90,7 @@ class GlassdoorScraper(Scraper):
job_list = job_list[: scraper_input.results_wanted]
break
except Exception as e:
logger.error(f"Glassdoor: {str(e)}")
log.error(f"Glassdoor: {str(e)}")
break
return JobResponse(jobs=job_list)
@@ -129,7 +126,7 @@ class GlassdoorScraper(Scraper):
ValueError,
Exception,
) as e:
logger.error(f"Glassdoor: {str(e)}")
log.error(f"Glassdoor: {str(e)}")
return jobs, None
jobs_data = res_json["data"]["jobListings"]["jobListings"]
@@ -146,7 +143,7 @@ class GlassdoorScraper(Scraper):
except Exception as exc:
raise GlassdoorException(f"Glassdoor generated an exception: {exc}")
return jobs, self.get_cursor_for_page(
return jobs, get_cursor_for_page(
res_json["data"]["jobListings"]["paginationCursors"], page_num + 1
)
@@ -185,9 +182,9 @@ class GlassdoorScraper(Scraper):
if location_type == "S":
is_remote = True
else:
location = self.parse_location(location_name)
location = parse_location(location_name)
compensation = self.parse_compensation(job["header"])
compensation = parse_compensation(job["header"])
try:
description = self._fetch_job_description(job_id)
except:
@@ -264,12 +261,12 @@ class GlassdoorScraper(Scraper):
if res.status_code != 200:
if res.status_code == 429:
err = f"429 Response - Blocked by Glassdoor for too many requests"
logger.error(err)
log.error(err)
return None, None
else:
err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}"
logger.error(f"Glassdoor response status code {res.status_code}")
log.error(f"Glassdoor response status code {res.status_code}")
return None, None
items = res.json()
@@ -321,44 +318,3 @@ class GlassdoorScraper(Scraper):
{"filterKey": "jobType", "values": self.scraper_input.job_type.value[0]}
)
return json.dumps([payload])
@staticmethod
def parse_compensation(data: dict) -> Optional[Compensation]:
pay_period = data.get("payPeriod")
adjusted_pay = data.get("payPeriodAdjustedPay")
currency = data.get("payCurrency", "USD")
if not pay_period or not adjusted_pay:
return None
interval = None
if pay_period == "ANNUAL":
interval = CompensationInterval.YEARLY
elif pay_period:
interval = CompensationInterval.get_interval(pay_period)
min_amount = int(adjusted_pay.get("p10") // 1)
max_amount = int(adjusted_pay.get("p90") // 1)
return Compensation(
interval=interval,
min_amount=min_amount,
max_amount=max_amount,
currency=currency,
)
@staticmethod
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
@staticmethod
def parse_location(location_name: str) -> Location | None:
if not location_name or location_name == "Remote":
return
city, _, state = location_name.partition(", ")
return Location(city=city, state=state)
@staticmethod
def get_cursor_for_page(pagination_cursors, page_num):
for cursor_data in pagination_cursors:
if cursor_data["pageNumber"] == page_num:
return cursor_data["cursor"]

42
jobspy/glassdoor/util.py Normal file
View File

@@ -0,0 +1,42 @@
from jobspy.model import Compensation, CompensationInterval, Location, JobType
def parse_compensation(data: dict) -> Compensation | None:
pay_period = data.get("payPeriod")
adjusted_pay = data.get("payPeriodAdjustedPay")
currency = data.get("payCurrency", "USD")
if not pay_period or not adjusted_pay:
return None
interval = None
if pay_period == "ANNUAL":
interval = CompensationInterval.YEARLY
elif pay_period:
interval = CompensationInterval.get_interval(pay_period)
min_amount = int(adjusted_pay.get("p10") // 1)
max_amount = int(adjusted_pay.get("p90") // 1)
return Compensation(
interval=interval,
min_amount=min_amount,
max_amount=max_amount,
currency=currency,
)
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
def parse_location(location_name: str) -> Location | None:
if not location_name or location_name == "Remote":
return
city, _, state = location_name.partition(", ")
return Location(city=city, state=state)
def get_cursor_for_page(pagination_cursors, page_num):
for cursor_data in pagination_cursors:
if cursor_data["pageNumber"] == page_num:
return cursor_data["cursor"]

View File

@@ -1,10 +1,3 @@
"""
jobspy.scrapers.google
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Glassdoor.
"""
from __future__ import annotations
import math
@@ -13,33 +6,30 @@ import json
from typing import Tuple
from datetime import datetime, timedelta
from .constants import headers_jobs, headers_initial, async_param
from .. import Scraper, ScraperInput, Site
from ..utils import extract_emails_from_text, create_logger, extract_job_type
from ..utils import (
create_session,
)
from ...jobs import (
from jobspy.google.constant import headers_jobs, headers_initial, async_param
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
JobResponse,
Location,
JobType,
)
logger = create_logger("Google")
from jobspy.util import extract_emails_from_text, extract_job_type, create_session
from jobspy.google.util import log, find_job_info_initial_page, find_job_info
class GoogleJobsScraper(Scraper):
class Google(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes GlassdoorScraper with the Glassdoor job search url
Initializes Google Scraper with the Goodle jobs search url
"""
site = Site(Site.GOOGLE)
super().__init__(site, proxies=proxies, ca_cert=ca_cert)
self.base_url = None
self.country = None
self.session = None
self.scraper_input = None
@@ -50,35 +40,39 @@ class GoogleJobsScraper(Scraper):
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Glassdoor for jobs with scraper_input criteria.
Scrapes Google for jobs with scraper_input criteria.
:param scraper_input: Information about job search criteria.
:return: JobResponse containing a list of jobs.
"""
self.scraper_input = scraper_input
self.scraper_input.results_wanted = min(900, scraper_input.results_wanted)
self.base_url = self.scraper_input.country.get_glassdoor_url()
self.session = create_session(
proxies=self.proxies, ca_cert=self.ca_cert, is_tls=False, has_retry=True
)
forward_cursor = self._get_initial_cursor()
forward_cursor, job_list = self._get_initial_cursor_and_jobs()
if forward_cursor is None:
logger.error("initial cursor not found")
return JobResponse(jobs=[])
log.warning(
"initial cursor not found, try changing your query or there was at most 10 results"
)
return JobResponse(jobs=job_list)
page = 1
job_list: list[JobPost] = []
while (
len(self.seen_urls) < scraper_input.results_wanted + scraper_input.offset
and forward_cursor
):
logger.info(
log.info(
f"search page: {page} / {math.ceil(scraper_input.results_wanted / self.jobs_per_page)}"
)
jobs, forward_cursor = self._get_jobs_next_page(forward_cursor)
try:
jobs, forward_cursor = self._get_jobs_next_page(forward_cursor)
except Exception as e:
log.error(f"failed to get jobs on page: {page}, {e}")
break
if not jobs:
logger.info(f"found no jobs on page: {page}")
log.info(f"found no jobs on page: {page}")
break
job_list += jobs
page += 1
@@ -89,8 +83,8 @@ class GoogleJobsScraper(Scraper):
]
)
def _get_initial_cursor(self):
"""Gets initial cursor to paginate through job listings"""
def _get_initial_cursor_and_jobs(self) -> Tuple[str, list[JobPost]]:
"""Gets initial cursor and jobs to paginate through job listings"""
query = f"{self.scraper_input.search_term} jobs"
def get_time_range(hours_old):
@@ -123,13 +117,22 @@ class GoogleJobsScraper(Scraper):
if self.scraper_input.is_remote:
query += " remote"
if self.scraper_input.google_search_term:
query = self.scraper_input.google_search_term
params = {"q": query, "udm": "8"}
response = self.session.get(self.url, headers=headers_initial, params=params)
pattern_fc = r'<div jsname="Yust4d"[^>]+data-async-fc="([^"]+)"'
match_fc = re.search(pattern_fc, response.text)
data_async_fc = match_fc.group(1) if match_fc else None
return data_async_fc
jobs_raw = find_job_info_initial_page(response.text)
jobs = []
for job_raw in jobs_raw:
job_post = self._parse_job(job_raw)
if job_post:
jobs.append(job_post)
return data_async_fc, jobs
def _get_jobs_next_page(self, forward_cursor: str) -> Tuple[list[JobPost], str]:
params = {"fc": [forward_cursor], "fcv": ["3"], "async": [async_param]}
@@ -149,69 +152,51 @@ class GoogleJobsScraper(Scraper):
match_fc = re.search(pattern_fc, job_data)
data_async_fc = match_fc.group(1) if match_fc else None
jobs_on_page = []
for array in parsed:
_, job_data = array
if not job_data.startswith("[[["):
continue
job_d = json.loads(job_data)
job_info = self._find_job_info(job_d)
job_url = job_info[3][0][0] if job_info[3] and job_info[3][0] else None
if job_url in self.seen_urls:
continue
self.seen_urls.add(job_url)
title = job_info[0]
company_name = job_info[1]
location = city = job_info[2]
state = country = date_posted = None
if location and "," in location:
city, state, *country = [*map(lambda x: x.strip(), location.split(","))]
days_ago_str = job_info[12]
if type(days_ago_str) == str:
match = re.search(r"\d+", days_ago_str)
days_ago = int(match.group()) if match else None
date_posted = (datetime.now() - timedelta(days=days_ago)).date()
description = job_info[19]
job_post = JobPost(
id=f"go-{job_info[28]}",
title=title,
company_name=company_name,
location=Location(
city=city, state=state, country=country[0] if country else None
),
job_url=job_url,
job_url_direct=job_url,
date_posted=date_posted,
is_remote="remote" in description.lower()
or "wfh" in description.lower(),
description=description,
emails=extract_emails_from_text(description),
job_type=extract_job_type(description),
)
jobs_on_page.append(job_post)
job_info = find_job_info(job_d)
job_post = self._parse_job(job_info)
if job_post:
jobs_on_page.append(job_post)
return jobs_on_page, data_async_fc
@staticmethod
def _find_job_info(jobs_data: list | dict) -> list | None:
"""Iterates through the JSON data to find the job listings"""
if isinstance(jobs_data, dict):
for key, value in jobs_data.items():
if key == "520084652" and isinstance(value, list):
return value
else:
result = GoogleJobsScraper._find_job_info(value)
if result:
return result
elif isinstance(jobs_data, list):
for item in jobs_data:
result = GoogleJobsScraper._find_job_info(item)
if result:
return result
return None
def _parse_job(self, job_info: list):
job_url = job_info[3][0][0] if job_info[3] and job_info[3][0] else None
if job_url in self.seen_urls:
return
self.seen_urls.add(job_url)
title = job_info[0]
company_name = job_info[1]
location = city = job_info[2]
state = country = date_posted = None
if location and "," in location:
city, state, *country = [*map(lambda x: x.strip(), location.split(","))]
days_ago_str = job_info[12]
if type(days_ago_str) == str:
match = re.search(r"\d+", days_ago_str)
days_ago = int(match.group()) if match else None
date_posted = (datetime.now() - timedelta(days=days_ago)).date()
description = job_info[19]
job_post = JobPost(
id=f"go-{job_info[28]}",
title=title,
company_name=company_name,
location=Location(
city=city, state=state, country=country[0] if country else None
),
job_url=job_url,
date_posted=date_posted,
is_remote="remote" in description.lower() or "wfh" in description.lower(),
description=description,
emails=extract_emails_from_text(description),
job_type=extract_job_type(description),
)
return job_post

41
jobspy/google/util.py Normal file
View File

@@ -0,0 +1,41 @@
import re
from jobspy.util import create_logger
log = create_logger("Google")
def find_job_info(jobs_data: list | dict) -> list | None:
"""Iterates through the JSON data to find the job listings"""
if isinstance(jobs_data, dict):
for key, value in jobs_data.items():
if key == "520084652" and isinstance(value, list):
return value
else:
result = find_job_info(value)
if result:
return result
elif isinstance(jobs_data, list):
for item in jobs_data:
result = find_job_info(item)
if result:
return result
return None
def find_job_info_initial_page(html_text: str):
pattern = f'520084652":(' + r"\[.*?\]\s*])\s*}\s*]\s*]\s*]\s*]\s*]"
results = []
matches = re.finditer(pattern, html_text)
import json
for match in matches:
try:
parsed_data = json.loads(match.group(1))
results.append(parsed_data)
except json.JSONDecodeError as e:
log.error(f"Failed to parse match: {str(e)}")
results.append({"raw_match": match.group(0), "error": str(e)})
return results

View File

@@ -1,39 +1,32 @@
"""
jobspy.scrapers.indeed
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape Indeed.
"""
from __future__ import annotations
import math
from typing import Tuple
from datetime import datetime
from typing import Tuple
from .constants import job_search_query, api_headers
from .. import Scraper, ScraperInput, Site
from ..utils import (
extract_emails_from_text,
get_enum_from_job_type,
markdown_converter,
create_session,
create_logger,
)
from ...jobs import (
from jobspy.indeed.constant import job_search_query, api_headers
from jobspy.indeed.util import is_job_remote, get_compensation, get_job_type
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
Compensation,
CompensationInterval,
Location,
JobResponse,
JobType,
DescriptionFormat,
)
from jobspy.util import (
extract_emails_from_text,
markdown_converter,
create_session,
create_logger,
)
logger = create_logger("Indeed")
log = create_logger("Indeed")
class IndeedScraper(Scraper):
class Indeed(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
@@ -71,12 +64,12 @@ class IndeedScraper(Scraper):
cursor = None
while len(self.seen_urls) < scraper_input.results_wanted + scraper_input.offset:
logger.info(
log.info(
f"search page: {page} / {math.ceil(scraper_input.results_wanted / self.jobs_per_page)}"
)
jobs, cursor = self._scrape_page(cursor)
if not jobs:
logger.info(f"found no jobs on page: {page}")
log.info(f"found no jobs on page: {page}")
break
job_list += jobs
page += 1
@@ -122,9 +115,10 @@ class IndeedScraper(Scraper):
headers=api_headers_temp,
json=payload,
timeout=10,
verify=False,
)
if not response.ok:
logger.info(
log.info(
f"responded with status code: {response.status_code} (submit GitHub issue if this appears to be a bug)"
)
return jobs, new_cursor
@@ -211,8 +205,10 @@ class IndeedScraper(Scraper):
description = job["description"]["html"]
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description = markdown_converter(description)
description = description.replace(",", "")
job_type = self._get_job_type(job["attributes"])
job_type = get_job_type(job["attributes"])
timestamp_seconds = job["datePublished"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds).strftime("%Y-%m-%d")
employer = job["employer"].get("dossier") if job["employer"] else None
@@ -233,14 +229,14 @@ class IndeedScraper(Scraper):
country=job.get("location", {}).get("countryCode"),
),
job_type=job_type,
compensation=self._get_compensation(job["compensation"]),
compensation=get_compensation(job["compensation"]),
date_posted=date_posted,
job_url=job_url,
job_url_direct=(
job["recruit"].get("viewJobUrl") if job.get("recruit") else None
),
emails=extract_emails_from_text(description) if description else None,
is_remote=self._is_job_remote(job, description),
is_remote=is_job_remote(job, description),
company_addresses=(
employer_details["addresses"][0]
if employer_details.get("addresses")
@@ -264,86 +260,3 @@ class IndeedScraper(Scraper):
else None
),
)
@staticmethod
def _get_job_type(attributes: list) -> list[JobType]:
"""
Parses the attributes to get list of job types
:param attributes:
:return: list of JobType
"""
job_types: list[JobType] = []
for attribute in attributes:
job_type_str = attribute["label"].replace("-", "").replace(" ", "").lower()
job_type = get_enum_from_job_type(job_type_str)
if job_type:
job_types.append(job_type)
return job_types
@staticmethod
def _get_compensation(compensation: dict) -> Compensation | None:
"""
Parses the job to get compensation
:param job:
:return: compensation object
"""
if not compensation["baseSalary"] and not compensation["estimated"]:
return None
comp = (
compensation["baseSalary"]
if compensation["baseSalary"]
else compensation["estimated"]["baseSalary"]
)
if not comp:
return None
interval = IndeedScraper._get_compensation_interval(comp["unitOfWork"])
if not interval:
return None
min_range = comp["range"].get("min")
max_range = comp["range"].get("max")
return Compensation(
interval=interval,
min_amount=int(min_range) if min_range is not None else None,
max_amount=int(max_range) if max_range is not None else None,
currency=(
compensation["estimated"]["currencyCode"]
if compensation["estimated"]
else compensation["currencyCode"]
),
)
@staticmethod
def _is_job_remote(job: dict, description: str) -> bool:
"""
Searches the description, location, and attributes to check if job is remote
"""
remote_keywords = ["remote", "work from home", "wfh"]
is_remote_in_attributes = any(
any(keyword in attr["label"].lower() for keyword in remote_keywords)
for attr in job["attributes"]
)
is_remote_in_description = any(
keyword in description.lower() for keyword in remote_keywords
)
is_remote_in_location = any(
keyword in job["location"]["formatted"]["long"].lower()
for keyword in remote_keywords
)
return (
is_remote_in_attributes or is_remote_in_description or is_remote_in_location
)
@staticmethod
def _get_compensation_interval(interval: str) -> CompensationInterval:
interval_mapping = {
"DAY": "DAILY",
"YEAR": "YEARLY",
"HOUR": "HOURLY",
"WEEK": "WEEKLY",
"MONTH": "MONTHLY",
}
mapped_interval = interval_mapping.get(interval.upper(), None)
if mapped_interval and mapped_interval in CompensationInterval.__members__:
return CompensationInterval[mapped_interval]
else:
raise ValueError(f"Unsupported interval: {interval}")

80
jobspy/indeed/util.py Normal file
View File

@@ -0,0 +1,80 @@
from jobspy.model import CompensationInterval, JobType, Compensation
from jobspy.util import get_enum_from_job_type
def get_job_type(attributes: list) -> list[JobType]:
"""
Parses the attributes to get list of job types
:param attributes:
:return: list of JobType
"""
job_types: list[JobType] = []
for attribute in attributes:
job_type_str = attribute["label"].replace("-", "").replace(" ", "").lower()
job_type = get_enum_from_job_type(job_type_str)
if job_type:
job_types.append(job_type)
return job_types
def get_compensation(compensation: dict) -> Compensation | None:
"""
Parses the job to get compensation
:param sssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrompensation:
:return: compensation object
"""
if not compensation["baseSalary"] and not compensation["estimated"]:
return None
comp = (
compensation["baseSalary"]
if compensation["baseSalary"]
else compensation["estimated"]["baseSalary"]
)
if not comp:
return None
interval = get_compensation_interval(comp["unitOfWork"])
if not interval:
return None
min_range = comp["range"].get("min")
max_range = comp["range"].get("max")
return Compensation(
interval=interval,
min_amount=int(min_range) if min_range is not None else None,
max_amount=int(max_range) if max_range is not None else None,
currency=(
compensation["estimated"]["currencyCode"]
if compensation["estimated"]
else compensation["currencyCode"]
),
)
def is_job_remote(job: dict, description: str) -> bool:
"""
Searches the description, location, and attributes to check if job is remote
"""
remote_keywords = ["remote", "work from home", "wfh"]
is_remote_in_attributes = any(
any(keyword in attr["label"].lower() for keyword in remote_keywords)
for attr in job["attributes"]
)
is_remote_in_location = any(
keyword in job["location"]["formatted"]["long"].lower()
for keyword in remote_keywords
)
return is_remote_in_attributes or is_remote_in_location
def get_compensation_interval(interval: str) -> CompensationInterval:
interval_mapping = {
"DAY": "DAILY",
"YEAR": "YEARLY",
"HOUR": "HOURLY",
"WEEK": "WEEKLY",
"MONTH": "MONTHLY",
}
mapped_interval = interval_mapping.get(interval.upper(), None)
if mapped_interval and mapped_interval in CompensationInterval.__members__:
return CompensationInterval[mapped_interval]
else:
raise ValueError(f"Unsupported interval: {interval}")

View File

@@ -1,47 +1,48 @@
"""
jobspy.scrapers.linkedin
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape LinkedIn.
"""
from __future__ import annotations
import math
import time
import random
import regex as re
from typing import Optional
import time
from datetime import datetime
from bs4.element import Tag
from bs4 import BeautifulSoup
from typing import Optional
from urllib.parse import urlparse, urlunparse, unquote
from .constants import headers
from .. import Scraper, ScraperInput, Site
from ..exceptions import LinkedInException
from ..utils import create_session, remove_attributes, create_logger
from ...jobs import (
import regex as re
from bs4 import BeautifulSoup
from bs4.element import Tag
from jobspy.exception import LinkedInException
from jobspy.linkedin.constant import headers
from jobspy.linkedin.util import (
job_type_code,
parse_job_type,
parse_job_level,
parse_company_industry,
)
from jobspy.model import (
JobPost,
Location,
JobResponse,
JobType,
Country,
Compensation,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
from ..utils import (
from jobspy.util import (
extract_emails_from_text,
get_enum_from_job_type,
currency_parser,
markdown_converter,
create_session,
remove_attributes,
create_logger,
)
logger = create_logger("LinkedIn")
log = create_logger("LinkedIn")
class LinkedInScraper(Scraper):
class LinkedIn(Scraper):
base_url = "https://www.linkedin.com"
delay = 3
band_delay = 4
@@ -86,7 +87,7 @@ class LinkedInScraper(Scraper):
)
while continue_search():
request_count += 1
logger.info(
log.info(
f"search page: {request_count} / {math.ceil(scraper_input.results_wanted / 10)}"
)
params = {
@@ -95,7 +96,7 @@ class LinkedInScraper(Scraper):
"distance": scraper_input.distance,
"f_WT": 2 if scraper_input.is_remote else None,
"f_JT": (
self.job_type_code(scraper_input.job_type)
job_type_code(scraper_input.job_type)
if scraper_input.job_type
else None
),
@@ -126,13 +127,13 @@ class LinkedInScraper(Scraper):
else:
err = f"LinkedIn response status code {response.status_code}"
err += f" - {response.text}"
logger.error(err)
log.error(err)
return JobResponse(jobs=job_list)
except Exception as e:
if "Proxy responded with" in str(e):
logger.error(f"LinkedIn: Bad proxy")
log.error(f"LinkedIn: Bad proxy")
else:
logger.error(f"LinkedIn: {str(e)}")
log.error(f"LinkedIn: {str(e)}")
return JobResponse(jobs=job_list)
soup = BeautifulSoup(response.text, "html.parser")
@@ -216,6 +217,8 @@ class LinkedInScraper(Scraper):
job_details = {}
if full_descr:
job_details = self._get_job_details(job_id)
description = description.replace(",", "")
return JobPost(
id=f"li-{job_id}",
@@ -282,9 +285,9 @@ class LinkedInScraper(Scraper):
)
return {
"description": description,
"job_level": self._parse_job_level(soup),
"company_industry": self._parse_company_industry(soup),
"job_type": self._parse_job_type(soup),
"job_level": parse_job_level(soup),
"company_industry": parse_company_industry(soup),
"job_type": parse_job_type(soup),
"job_url_direct": self._parse_job_url_direct(soup),
"company_logo": company_logo,
"job_function": job_function,
@@ -316,77 +319,6 @@ class LinkedInScraper(Scraper):
location = Location(city=city, state=state, country=country)
return location
@staticmethod
def _parse_job_type(soup_job_type: BeautifulSoup) -> list[JobType] | None:
"""
Gets the job type from job page
:param soup_job_type:
:return: JobType
"""
h3_tag = soup_job_type.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Employment type" in text,
)
employment_type = None
if h3_tag:
employment_type_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if employment_type_span:
employment_type = employment_type_span.get_text(strip=True)
employment_type = employment_type.lower()
employment_type = employment_type.replace("-", "")
return [get_enum_from_job_type(employment_type)] if employment_type else []
@staticmethod
def _parse_job_level(soup_job_level: BeautifulSoup) -> str | None:
"""
Gets the job level from job page
:param soup_job_level:
:return: str
"""
h3_tag = soup_job_level.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Seniority level" in text,
)
job_level = None
if h3_tag:
job_level_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if job_level_span:
job_level = job_level_span.get_text(strip=True)
return job_level
@staticmethod
def _parse_company_industry(soup_industry: BeautifulSoup) -> str | None:
"""
Gets the company industry from job page
:param soup_industry:
:return: str
"""
h3_tag = soup_industry.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Industries" in text,
)
industry = None
if h3_tag:
industry_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if industry_span:
industry = industry_span.get_text(strip=True)
return industry
def _parse_job_url_direct(self, soup: BeautifulSoup) -> str | None:
"""
Gets the job url direct from job page
@@ -403,13 +335,3 @@ class LinkedInScraper(Scraper):
job_url_direct = unquote(job_url_direct_match.group())
return job_url_direct
@staticmethod
def job_type_code(job_type_enum: JobType) -> str:
return {
JobType.FULL_TIME: "F",
JobType.PART_TIME: "P",
JobType.INTERNSHIP: "I",
JobType.CONTRACT: "C",
JobType.TEMPORARY: "T",
}.get(job_type_enum, "")

85
jobspy/linkedin/util.py Normal file
View File

@@ -0,0 +1,85 @@
from bs4 import BeautifulSoup
from jobspy.model import JobType
from jobspy.util import get_enum_from_job_type
def job_type_code(job_type_enum: JobType) -> str:
return {
JobType.FULL_TIME: "F",
JobType.PART_TIME: "P",
JobType.INTERNSHIP: "I",
JobType.CONTRACT: "C",
JobType.TEMPORARY: "T",
}.get(job_type_enum, "")
def parse_job_type(soup_job_type: BeautifulSoup) -> list[JobType] | None:
"""
Gets the job type from job page
:param soup_job_type:
:return: JobType
"""
h3_tag = soup_job_type.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Employment type" in text,
)
employment_type = None
if h3_tag:
employment_type_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if employment_type_span:
employment_type = employment_type_span.get_text(strip=True)
employment_type = employment_type.lower()
employment_type = employment_type.replace("-", "")
return [get_enum_from_job_type(employment_type)] if employment_type else []
def parse_job_level(soup_job_level: BeautifulSoup) -> str | None:
"""
Gets the job level from job page
:param soup_job_level:
:return: str
"""
h3_tag = soup_job_level.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Seniority level" in text,
)
job_level = None
if h3_tag:
job_level_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if job_level_span:
job_level = job_level_span.get_text(strip=True)
return job_level
def parse_company_industry(soup_industry: BeautifulSoup) -> str | None:
"""
Gets the company industry from job page
:param soup_industry:
:return: str
"""
h3_tag = soup_industry.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Industries" in text,
)
industry = None
if h3_tag:
industry_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if industry_span:
industry = industry_span.get_text(strip=True)
return industry

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional
from datetime import date
from enum import Enum
@@ -68,16 +69,20 @@ class Country(Enum):
AUSTRIA = ("austria", "at", "at")
BAHRAIN = ("bahrain", "bh")
BELGIUM = ("belgium", "be", "fr:be")
BULGARIA = ("bulgaria", "bg")
BRAZIL = ("brazil", "br", "com.br")
CANADA = ("canada", "ca", "ca")
CHILE = ("chile", "cl")
CHINA = ("china", "cn")
COLOMBIA = ("colombia", "co")
COSTARICA = ("costa rica", "cr")
CROATIA = ("croatia", "hr")
CYPRUS = ("cyprus", "cy")
CZECHREPUBLIC = ("czech republic,czechia", "cz")
DENMARK = ("denmark", "dk")
ECUADOR = ("ecuador", "ec")
EGYPT = ("egypt", "eg")
ESTONIA = ("estonia", "ee")
FINLAND = ("finland", "fi")
FRANCE = ("france", "fr", "fr")
GERMANY = ("germany", "de", "de")
@@ -91,6 +96,8 @@ class Country(Enum):
ITALY = ("italy", "it", "it")
JAPAN = ("japan", "jp")
KUWAIT = ("kuwait", "kw")
LATVIA = ("latvia", "lv")
LITHUANIA = ("lithuania", "lt")
LUXEMBOURG = ("luxembourg", "lu")
MALAYSIA = ("malaysia", "malaysia:my", "com")
MALTA = ("malta", "malta:mt", "mt")
@@ -111,6 +118,8 @@ class Country(Enum):
ROMANIA = ("romania", "ro")
SAUDIARABIA = ("saudi arabia", "sa")
SINGAPORE = ("singapore", "sg", "sg")
SLOVAKIA = ("slovakia", "sk")
SLOVENIA = ("slovenia", "sl")
SOUTHAFRICA = ("south africa", "za")
SOUTHKOREA = ("south korea", "kr")
SPAIN = ("spain", "es", "es")
@@ -265,3 +274,49 @@ class JobPost(BaseModel):
class JobResponse(BaseModel):
jobs: list[JobPost] = []
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
BAYT = "bayt"
class SalarySource(Enum):
DIRECT_DATA = "direct_data"
DESCRIPTION = "description"
class ScraperInput(BaseModel):
site_type: list[Site]
search_term: str | None = None
google_search_term: str | None = None
location: str | None = None
country: Country | None = Country.USA
distance: int | None = None
is_remote: bool = False
job_type: JobType | None = None
easy_apply: bool | None = None
offset: int = 0
linkedin_fetch_description: bool = False
linkedin_company_ids: list[int] | None = None
description_format: DescriptionFormat | None = DescriptionFormat.MARKDOWN
results_wanted: int = 15
hours_old: int | None = None
class Scraper(ABC):
def __init__(
self, site: Site, proxies: list[str] | None = None, ca_cert: str | None = None
):
self.site = site
self.proxies = proxies
self.ca_cert = ca_cert
@abstractmethod
def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...

View File

@@ -1,16 +1,19 @@
from __future__ import annotations
import re
import logging
import re
from itertools import cycle
import numpy as np
import requests
import tls_client
import numpy as np
import urllib3
from markdownify import markdownify as md
from requests.adapters import HTTPAdapter, Retry
from ..jobs import CompensationInterval, JobType
from jobspy.model import CompensationInterval, JobType, Site
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def create_logger(name: str):
@@ -129,7 +132,7 @@ def create_session(
return session
def set_logger_level(verbose: int = 2):
def set_logger_level(verbose: int):
"""
Adjusts the logger's level. This function allows the logging level to be changed at runtime.
@@ -283,3 +286,62 @@ def extract_job_type(description: str):
listing_types.append(key)
return listing_types if listing_types else None
def map_str_to_site(site_name: str) -> Site:
return Site[site_name.upper()]
def get_enum_from_value(value_str):
for job_type in JobType:
if value_str in job_type.value:
return job_type
raise Exception(f"Invalid job type: {value_str}")
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
desired_order = [
"id",
"site",
"job_url",
"job_url_direct",
"title",
"company",
"location",
"date_posted",
"job_type",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]

View File

@@ -1,46 +1,39 @@
"""
jobspy.scrapers.ziprecruiter
~~~~~~~~~~~~~~~~~~~
This module contains routines to scrape ZipRecruiter.
"""
from __future__ import annotations
import json
import math
import re
import time
from datetime import datetime
from typing import Optional, Tuple, Any
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from bs4 import BeautifulSoup
from .constants import headers
from .. import Scraper, ScraperInput, Site
from ..utils import (
from jobspy.ziprecruiter.constant import headers, get_cookie_data
from jobspy.util import (
extract_emails_from_text,
create_session,
markdown_converter,
remove_attributes,
create_logger,
)
from ...jobs import (
from jobspy.model import (
JobPost,
Compensation,
Location,
JobResponse,
JobType,
Country,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
from jobspy.ziprecruiter.util import get_job_type_enum, add_params
logger = create_logger("ZipRecruiter")
log = create_logger("ZipRecruiter")
class ZipRecruiterScraper(Scraper):
class ZipRecruiter(Scraper):
base_url = "https://www.ziprecruiter.com"
api_url = "https://api.ziprecruiter.com"
@@ -77,7 +70,7 @@ class ZipRecruiterScraper(Scraper):
break
if page > 1:
time.sleep(self.delay)
logger.info(f"search page: {page} / {max_pages}")
log.info(f"search page: {page} / {max_pages}")
jobs_on_page, continue_token = self._find_jobs_in_page(
scraper_input, continue_token
)
@@ -91,7 +84,7 @@ class ZipRecruiterScraper(Scraper):
def _find_jobs_in_page(
self, scraper_input: ScraperInput, continue_token: str | None = None
) -> Tuple[list[JobPost], Optional[str]]:
) -> tuple[list[JobPost], str | None]:
"""
Scrapes a page of ZipRecruiter for jobs with scraper_input criteria
:param scraper_input:
@@ -99,7 +92,7 @@ class ZipRecruiterScraper(Scraper):
:return: jobs found on page
"""
jobs_list = []
params = self._add_params(scraper_input)
params = add_params(scraper_input)
if continue_token:
params["continue_from"] = continue_token
try:
@@ -110,13 +103,13 @@ class ZipRecruiterScraper(Scraper):
else:
err = f"ZipRecruiter response status code {res.status_code}"
err += f" with response: {res.text}" # ZipRecruiter likely not available in EU
logger.error(err)
log.error(err)
return jobs_list, ""
except Exception as e:
if "Proxy responded with" in str(e):
logger.error(f"Indeed: Bad proxy")
log.error(f"Indeed: Bad proxy")
else:
logger.error(f"Indeed: {str(e)}")
log.error(f"Indeed: {str(e)}")
return jobs_list, ""
res_data = res.json()
@@ -152,7 +145,7 @@ class ZipRecruiterScraper(Scraper):
location = Location(
city=job.get("job_city"), state=job.get("job_state"), country=country_enum
)
job_type = self._get_job_type_enum(
job_type = get_job_type_enum(
job.get("employment_type", "").replace("_", "").lower()
)
date_posted = datetime.fromisoformat(job["posted_time"].rstrip("Z")).date()
@@ -201,13 +194,17 @@ class ZipRecruiterScraper(Scraper):
else ""
)
description_full = job_description_clean + company_description_clean
script_tag = soup.find("script", type="application/json")
if script_tag:
job_json = json.loads(script_tag.string)
job_url_val = job_json["model"].get("saveJobURL", "")
m = re.search(r"job_url=(.+)", job_url_val)
if m:
job_url_direct = m.group(1)
try:
script_tag = soup.find("script", type="application/json")
if script_tag:
job_json = json.loads(script_tag.string)
job_url_val = job_json["model"].get("saveJobURL", "")
m = re.search(r"job_url=(.+)", job_url_val)
if m:
job_url_direct = m.group(1)
except:
job_url_direct = None
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description_full = markdown_converter(description_full)
@@ -215,33 +212,8 @@ class ZipRecruiterScraper(Scraper):
return description_full, job_url_direct
def _get_cookies(self):
data = "event_type=session&logged_in=false&number_of_retry=1&property=model%3AiPhone&property=os%3AiOS&property=locale%3Aen_us&property=app_build_number%3A4734&property=app_version%3A91.0&property=manufacturer%3AApple&property=timestamp%3A2024-01-12T12%3A04%3A42-06%3A00&property=screen_height%3A852&property=os_version%3A16.6.1&property=source%3Ainstall&property=screen_width%3A393&property=device_model%3AiPhone%2014%20Pro&property=brand%3AApple"
"""
Sends a session event to the API with device properties.
"""
url = f"{self.api_url}/jobs-app/event"
self.session.post(url, data=data)
@staticmethod
def _get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
return None
@staticmethod
def _add_params(scraper_input) -> dict[str, str | Any]:
params = {
"search": scraper_input.search_term,
"location": scraper_input.location,
}
if scraper_input.hours_old:
params["days"] = max(scraper_input.hours_old // 24, 1)
job_type_map = {JobType.FULL_TIME: "full_time", JobType.PART_TIME: "part_time"}
if scraper_input.job_type:
job_type = scraper_input.job_type
params["employment_type"] = job_type_map.get(job_type, job_type.value[0])
if scraper_input.easy_apply:
params["zipapply"] = 1
if scraper_input.is_remote:
params["remote"] = 1
if scraper_input.distance:
params["radius"] = scraper_input.distance
return {k: v for k, v in params.items() if v is not None}
self.session.post(url, data=get_cookie_data)

View File

@@ -0,0 +1,29 @@
headers = {
"Host": "api.ziprecruiter.com",
"accept": "*/*",
"x-zr-zva-override": "100000000;vid:ZT1huzm_EQlDTVEc",
"x-pushnotificationid": "0ff4983d38d7fc5b3370297f2bcffcf4b3321c418f5c22dd152a0264707602a0",
"x-deviceid": "D77B3A92-E589-46A4-8A39-6EF6F1D86006",
"user-agent": "Job Search/87.0 (iPhone; CPU iOS 16_6_1 like Mac OS X)",
"authorization": "Basic YTBlZjMyZDYtN2I0Yy00MWVkLWEyODMtYTI1NDAzMzI0YTcyOg==",
"accept-language": "en-US,en;q=0.9",
}
get_cookie_data = [
("event_type", "session"),
("logged_in", "false"),
("number_of_retry", "1"),
("property", "model:iPhone"),
("property", "os:iOS"),
("property", "locale:en_us"),
("property", "app_build_number:4734"),
("property", "app_version:91.0"),
("property", "manufacturer:Apple"),
("property", "timestamp:2025-01-12T12:04:42-06:00"),
("property", "screen_height:852"),
("property", "os_version:16.6.1"),
("property", "source:install"),
("property", "screen_width:393"),
("property", "device_model:iPhone 14 Pro"),
("property", "brand:Apple"),
]

View File

@@ -0,0 +1,31 @@
from jobspy.model import JobType
def add_params(scraper_input) -> dict[str, str | int]:
params: dict[str, str | int] = {
"search": scraper_input.search_term,
"location": scraper_input.location,
}
if scraper_input.hours_old:
params["days"] = max(scraper_input.hours_old // 24, 1)
job_type_map = {JobType.FULL_TIME: "full_time", JobType.PART_TIME: "part_time"}
if scraper_input.job_type:
job_type = scraper_input.job_type
params["employment_type"] = job_type_map.get(job_type, job_type.value[0])
if scraper_input.easy_apply:
params["zipapply"] = 1
if scraper_input.is_remote:
params["remote"] = 1
if scraper_input.distance:
params["radius"] = scraper_input.distance
return {k: v for k, v in params.items() if v is not None}
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
return None

1159
jobspy_output.csv Normal file

File diff suppressed because it is too large Load Diff

236
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -1,2 +0,0 @@
[virtualenvs]
in-project = true

View File

@@ -1,18 +1,23 @@
[build-system]
requires = [ "poetry-core",]
build-backend = "poetry.core.masonry.api"
[tool.poetry]
name = "python-jobspy"
version = "1.1.73"
description = "Job scraper for LinkedIn, Indeed, Glassdoor & ZipRecruiter"
authors = ["Zachary Hampton <zachary@bunsly.com>", "Cullen Watson <cullen@bunsly.com>"]
homepage = "https://github.com/Bunsly/JobSpy"
version = "1.1.78"
description = "Job scraper for LinkedIn, Indeed, Glassdoor, ZipRecruiter & Bayt"
authors = ["Cullen Watson <cullen@cullenwatson.com>", "Zachary Hampton <zachary@zacharysproducts.com>"]
homepage = "https://github.com/cullenwatson/JobSpy"
readme = "README.md"
keywords = ['jobs-scraper', 'linkedin', 'indeed', 'glassdoor', 'ziprecruiter']
keywords = [ "jobs-scraper", "linkedin", "indeed", "glassdoor", "ziprecruiter", "bayt"]
[[tool.poetry.packages]]
include = "jobspy"
packages = [
{ include = "jobspy", from = "src" }
]
[tool.black]
line-length = 88
[tool.poetry.dependencies]
python = "^3.10"
python = "^3.10 || ^3.12"
requests = "^2.31.0"
beautifulsoup4 = "^4.12.2"
pandas = "^2.1.0"
@@ -22,16 +27,7 @@ tls-client = "^1.0.1"
markdownify = "^0.13.1"
regex = "^2024.4.28"
[tool.poetry.group.dev.dependencies]
pytest = "^7.4.1"
jupyter = "^1.0.0"
black = "*"
pre-commit = "*"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.black]
line-length = 88

118
requirements.txt Normal file
View File

@@ -0,0 +1,118 @@
annotated-types==0.7.0
anyio==4.6.2.post1
argon2-cffi==23.1.0
argon2-cffi-bindings==21.2.0
arrow==1.3.0
asttokens==2.4.1
async-lru==2.0.4
attrs==24.2.0
babel==2.16.0
beautifulsoup4==4.12.3
black==24.10.0
bleach==6.1.0
certifi==2024.8.30
cffi==1.17.1
cfgv==3.4.0
charset-normalizer==3.4.0
click==8.1.7
comm==0.2.2
debugpy==1.8.7
decorator==5.1.1
defusedxml==0.7.1
distlib==0.3.9
executing==2.1.0
fastjsonschema==2.20.0
filelock==3.16.1
fqdn==1.5.1
h11==0.14.0
httpcore==1.0.6
httpx==0.27.2
identify==2.6.1
idna==3.10
ipykernel==6.29.5
ipython==8.28.0
ipywidgets==8.1.5
isoduration==20.11.0
jedi==0.19.1
Jinja2==3.1.4
json5==0.9.25
jsonpointer==3.0.0
jsonschema==4.23.0
jsonschema-specifications==2024.10.1
jupyter==1.1.1
jupyter-console==6.6.3
jupyter-events==0.10.0
jupyter-lsp==2.2.5
jupyter_client==8.6.3
jupyter_core==5.7.2
jupyter_server==2.14.2
jupyter_server_terminals==0.5.3
jupyterlab==4.2.5
jupyterlab_pygments==0.3.0
jupyterlab_server==2.27.3
jupyterlab_widgets==3.0.13
markdownify==0.13.1
MarkupSafe==3.0.2
matplotlib-inline==0.1.7
mistune==3.0.2
mypy-extensions==1.0.0
nbclient==0.10.0
nbconvert==7.16.4
nbformat==5.10.4
nest-asyncio==1.6.0
nodeenv==1.9.1
notebook==7.2.2
notebook_shim==0.2.4
numpy==1.26.3
overrides==7.7.0
packaging==24.1
pandas==2.2.3
pandocfilters==1.5.1
parso==0.8.4
pathspec==0.12.1
pexpect==4.9.0
platformdirs==4.3.6
pre_commit==4.0.1
prometheus_client==0.21.0
prompt_toolkit==3.0.48
psutil==6.1.0
ptyprocess==0.7.0
pure_eval==0.2.3
pycparser==2.22
pydantic==2.9.2
pydantic_core==2.23.4
Pygments==2.18.0
python-dateutil==2.9.0.post0
-e git+https://github.com/fakebranden/JobSpy@60819a8fcabbd3eaba7741b673023612dc3d3692#egg=python_jobspy
python-json-logger==2.0.7
pytz==2024.2
PyYAML==6.0.2
pyzmq==26.2.0
referencing==0.35.1
regex==2024.9.11
requests==2.32.3
rfc3339-validator==0.1.4
rfc3986-validator==0.1.1
rpds-py==0.20.0
Send2Trash==1.8.3
setuptools==75.2.0
six==1.16.0
sniffio==1.3.1
soupsieve==2.6
stack-data==0.6.3
terminado==0.18.1
tinycss2==1.3.0
tls-client==1.0.1
tornado==6.4.1
traitlets==5.14.3
types-python-dateutil==2.9.0.20241003
typing_extensions==4.12.2
tzdata==2024.2
uri-template==1.3.0
urllib3==2.2.3
virtualenv==20.27.0
wcwidth==0.2.13
webcolors==24.8.0
webencodings==0.5.1
websocket-client==1.8.0
widgetsnbextension==4.0.13

View File

@@ -1,56 +0,0 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from ..jobs import (
Enum,
BaseModel,
JobType,
JobResponse,
Country,
DescriptionFormat,
)
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
class SalarySource(Enum):
DIRECT_DATA = "direct_data"
DESCRIPTION = "description"
class ScraperInput(BaseModel):
site_type: list[Site]
search_term: str | None = None
location: str | None = None
country: Country | None = Country.USA
distance: int | None = None
is_remote: bool = False
job_type: JobType | None = None
easy_apply: bool | None = None
offset: int = 0
linkedin_fetch_description: bool = False
linkedin_company_ids: list[int] | None = None
description_format: DescriptionFormat | None = DescriptionFormat.MARKDOWN
results_wanted: int = 15
hours_old: int | None = None
class Scraper(ABC):
def __init__(
self, site: Site, proxies: list[str] | None = None, ca_cert: str | None = None
):
self.site = site
self.proxies = proxies
self.ca_cert = ca_cert
@abstractmethod
def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...

View File

@@ -1,10 +0,0 @@
headers = {
"Host": "api.ziprecruiter.com",
"accept": "*/*",
"x-zr-zva-override": "100000000;vid:ZT1huzm_EQlDTVEc",
"x-pushnotificationid": "0ff4983d38d7fc5b3370297f2bcffcf4b3321c418f5c22dd152a0264707602a0",
"x-deviceid": "D77B3A92-E589-46A4-8A39-6EF6F1D86006",
"user-agent": "Job Search/87.0 (iPhone; CPU iOS 16_6_1 like Mac OS X)",
"authorization": "Basic YTBlZjMyZDYtN2I0Yy00MWVkLWEyODMtYTI1NDAzMzI0YTcyOg==",
"accept-language": "en-US,en;q=0.9",
}

View File

@@ -1,18 +0,0 @@
from jobspy import scrape_jobs
import pandas as pd
def test_all():
sites = [
"indeed",
"glassdoor",
] # ziprecruiter/linkedin needs good ip, and temp fix to pass test on ci
result = scrape_jobs(
site_name=sites,
search_term="engineer",
results_wanted=5,
)
assert (
isinstance(result, pd.DataFrame) and len(result) == len(sites) * 5
), "Result should be a non-empty DataFrame"

View File

@@ -1,13 +0,0 @@
from jobspy import scrape_jobs
import pandas as pd
def test_glassdoor():
result = scrape_jobs(
site_name="glassdoor",
search_term="engineer",
results_wanted=5,
)
assert (
isinstance(result, pd.DataFrame) and len(result) == 5
), "Result should be a non-empty DataFrame"

View File

@@ -1,12 +0,0 @@
from jobspy import scrape_jobs
import pandas as pd
def test_google():
result = scrape_jobs(
site_name="google", search_term="software engineer", results_wanted=5
)
assert (
isinstance(result, pd.DataFrame) and len(result) == 5
), "Result should be a non-empty DataFrame"

View File

@@ -1,13 +0,0 @@
from jobspy import scrape_jobs
import pandas as pd
def test_indeed():
result = scrape_jobs(
site_name="indeed",
search_term="engineer",
results_wanted=5,
)
assert (
isinstance(result, pd.DataFrame) and len(result) == 5
), "Result should be a non-empty DataFrame"

View File

@@ -1,9 +0,0 @@
from jobspy import scrape_jobs
import pandas as pd
def test_linkedin():
result = scrape_jobs(site_name="linkedin", search_term="engineer", results_wanted=5)
assert (
isinstance(result, pd.DataFrame) and len(result) == 5
), "Result should be a non-empty DataFrame"

View File

@@ -1,12 +0,0 @@
from jobspy import scrape_jobs
import pandas as pd
def test_ziprecruiter():
result = scrape_jobs(
site_name="zip_recruiter", search_term="software engineer", results_wanted=5
)
assert (
isinstance(result, pd.DataFrame) and len(result) == 5
), "Result should be a non-empty DataFrame"