refactor:organize code

This commit is contained in:
Cullen Watson
2025-02-21 14:14:55 -06:00
parent df70d4bc2e
commit 4ec308a302
25 changed files with 569 additions and 624 deletions

202
jobspy/__init__.py Normal file
View File

@@ -0,0 +1,202 @@
from __future__ import annotations
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Tuple
import pandas as pd
from jobspy.bayt import BaytScraper
from jobspy.glassdoor import Glassdoor
from jobspy.google import Google
from jobspy.indeed import Indeed
from jobspy.linkedin import LinkedIn
from jobspy.model import JobType, Location, JobResponse, Country
from jobspy.model import SalarySource, ScraperInput, Site
from jobspy.util import (
set_logger_level,
extract_salary,
create_logger,
get_enum_from_value,
map_str_to_site,
convert_to_annual,
desired_order,
)
from jobspy.ziprecruiter import ZipRecruiter
def scrape_jobs(
site_name: str | list[str] | Site | list[Site] | None = None,
search_term: str | None = None,
google_search_term: str | None = None,
location: str | None = None,
distance: int | None = 50,
is_remote: bool = False,
job_type: str | None = None,
easy_apply: bool | None = None,
results_wanted: int = 15,
country_indeed: str = "usa",
proxies: list[str] | str | None = None,
ca_cert: str | None = None,
description_format: str = "markdown",
linkedin_fetch_description: bool | None = False,
linkedin_company_ids: list[int] | None = None,
offset: int | None = 0,
hours_old: int = None,
enforce_annual_salary: bool = False,
verbose: int = 0,
**kwargs,
) -> pd.DataFrame:
"""
Scrapes job data from job boards concurrently
:return: Pandas DataFrame containing job data
"""
SCRAPER_MAPPING = {
Site.LINKEDIN: LinkedIn,
Site.INDEED: Indeed,
Site.ZIP_RECRUITER: ZipRecruiter,
Site.GLASSDOOR: Glassdoor,
Site.GOOGLE: Google,
Site.BAYT: BaytScraper,
}
set_logger_level(verbose)
job_type = get_enum_from_value(job_type) if job_type else None
def get_site_type():
site_types = list(Site)
if isinstance(site_name, str):
site_types = [map_str_to_site(site_name)]
elif isinstance(site_name, Site):
site_types = [site_name]
elif isinstance(site_name, list):
site_types = [
map_str_to_site(site) if isinstance(site, str) else site
for site in site_name
]
return site_types
country_enum = Country.from_string(country_indeed)
scraper_input = ScraperInput(
site_type=get_site_type(),
country=country_enum,
search_term=search_term,
google_search_term=google_search_term,
location=location,
distance=distance,
is_remote=is_remote,
job_type=job_type,
easy_apply=easy_apply,
description_format=description_format,
linkedin_fetch_description=linkedin_fetch_description,
results_wanted=results_wanted,
linkedin_company_ids=linkedin_company_ids,
offset=offset,
hours_old=hours_old,
)
def scrape_site(site: Site) -> Tuple[str, JobResponse]:
scraper_class = SCRAPER_MAPPING[site]
scraper = scraper_class(proxies=proxies, ca_cert=ca_cert)
scraped_data: JobResponse = scraper.scrape(scraper_input)
cap_name = site.value.capitalize()
site_name = "ZipRecruiter" if cap_name == "Zip_recruiter" else cap_name
create_logger(site_name).info(f"finished scraping")
return site.value, scraped_data
site_to_jobs_dict = {}
def worker(site):
site_val, scraped_info = scrape_site(site)
return site_val, scraped_info
with ThreadPoolExecutor() as executor:
future_to_site = {
executor.submit(worker, site): site for site in scraper_input.site_type
}
for future in as_completed(future_to_site):
site_value, scraped_data = future.result()
site_to_jobs_dict[site_value] = scraped_data
jobs_dfs: list[pd.DataFrame] = []
for site, job_response in site_to_jobs_dict.items():
for job in job_response.jobs:
job_data = job.dict()
job_url = job_data["job_url"]
job_data["site"] = site
job_data["company"] = job_data["company_name"]
job_data["job_type"] = (
", ".join(job_type.value[0] for job_type in job_data["job_type"])
if job_data["job_type"]
else None
)
job_data["emails"] = (
", ".join(job_data["emails"]) if job_data["emails"] else None
)
if job_data["location"]:
job_data["location"] = Location(
**job_data["location"]
).display_location()
compensation_obj = job_data.get("compensation")
if compensation_obj and isinstance(compensation_obj, dict):
job_data["interval"] = (
compensation_obj.get("interval").value
if compensation_obj.get("interval")
else None
)
job_data["min_amount"] = compensation_obj.get("min_amount")
job_data["max_amount"] = compensation_obj.get("max_amount")
job_data["currency"] = compensation_obj.get("currency", "USD")
job_data["salary_source"] = SalarySource.DIRECT_DATA.value
if enforce_annual_salary and (
job_data["interval"]
and job_data["interval"] != "yearly"
and job_data["min_amount"]
and job_data["max_amount"]
):
convert_to_annual(job_data)
else:
if country_enum == Country.USA:
(
job_data["interval"],
job_data["min_amount"],
job_data["max_amount"],
job_data["currency"],
) = extract_salary(
job_data["description"],
enforce_annual_salary=enforce_annual_salary,
)
job_data["salary_source"] = SalarySource.DESCRIPTION.value
job_data["salary_source"] = (
job_data["salary_source"]
if "min_amount" in job_data and job_data["min_amount"]
else None
)
job_df = pd.DataFrame([job_data])
jobs_dfs.append(job_df)
if jobs_dfs:
# Step 1: Filter out all-NA columns from each DataFrame before concatenation
filtered_dfs = [df.dropna(axis=1, how="all") for df in jobs_dfs]
# Step 2: Concatenate the filtered DataFrames
jobs_df = pd.concat(filtered_dfs, ignore_index=True)
# Step 3: Ensure all desired columns are present, adding missing ones as empty
for column in desired_order:
if column not in jobs_df.columns:
jobs_df[column] = None # Add missing columns as empty
# Reorder the DataFrame according to the desired order
jobs_df = jobs_df[desired_order]
# Step 4: Sort the DataFrame as required
return jobs_df.sort_values(
by=["site", "date_posted"], ascending=[True, False]
).reset_index(drop=True)
else:
return pd.DataFrame()

145
jobspy/bayt/__init__.py Normal file
View File

@@ -0,0 +1,145 @@
from __future__ import annotations
import random
import time
from bs4 import BeautifulSoup
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
JobResponse,
Location,
Country,
)
from jobspy.util import create_logger, create_session
log = create_logger("Bayt")
class BaytScraper(Scraper):
base_url = "https://www.bayt.com"
delay = 2
band_delay = 3
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
super().__init__(Site.BAYT, proxies=proxies, ca_cert=ca_cert)
self.scraper_input = None
self.session = None
self.country = "worldwide"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
self.scraper_input = scraper_input
self.session = create_session(
proxies=self.proxies, ca_cert=self.ca_cert, is_tls=False, has_retry=True
)
job_list: list[JobPost] = []
page = 1
results_wanted = (
scraper_input.results_wanted if scraper_input.results_wanted else 10
)
while len(job_list) < results_wanted:
log.info(f"Fetching Bayt jobs page {page}")
job_elements = self._fetch_jobs(self.scraper_input.search_term, page)
if not job_elements:
break
if job_elements:
log.debug(
"First job element snippet:\n" + job_elements[0].prettify()[:500]
)
initial_count = len(job_list)
for job in job_elements:
try:
job_post = self._extract_job_info(job)
if job_post:
job_list.append(job_post)
if len(job_list) >= results_wanted:
break
else:
log.debug(
"Extraction returned None. Job snippet:\n"
+ job.prettify()[:500]
)
except Exception as e:
log.error(f"Bayt: Error extracting job info: {str(e)}")
continue
if len(job_list) == initial_count:
log.info(f"No new jobs found on page {page}. Ending pagination.")
break
page += 1
time.sleep(random.uniform(self.delay, self.delay + self.band_delay))
job_list = job_list[: scraper_input.results_wanted]
return JobResponse(jobs=job_list)
def _fetch_jobs(self, query: str, page: int) -> list | None:
"""
Grabs the job results for the given query and page number.
"""
try:
url = f"{self.base_url}/en/international/jobs/{query}-jobs/?page={page}"
response = self.session.get(url)
response.raise_for_status()
soup = BeautifulSoup(response.text, "html.parser")
job_listings = soup.find_all("li", attrs={"data-js-job": ""})
log.debug(f"Found {len(job_listings)} job listing elements")
return job_listings
except Exception as e:
log.error(f"Bayt: Error fetching jobs - {str(e)}")
return None
def _extract_job_info(self, job: BeautifulSoup) -> JobPost | None:
"""
Extracts the job information from a single job listing.
"""
# Find the h2 element holding the title and link (no class filtering)
job_general_information = job.find("h2")
if not job_general_information:
return
job_title = job_general_information.get_text(strip=True)
job_url = self._extract_job_url(job_general_information)
if not job_url:
return
# Extract company name using the original approach:
company_tag = job.find("div", class_="t-nowrap p10l")
company_name = (
company_tag.find("span").get_text(strip=True)
if company_tag and company_tag.find("span")
else None
)
# Extract location using the original approach:
location_tag = job.find("div", class_="t-mute t-small")
location = location_tag.get_text(strip=True) if location_tag else None
job_id = f"bayt-{abs(hash(job_url))}"
location_obj = Location(
city=location,
country=Country.from_string(self.country),
)
return JobPost(
id=job_id,
title=job_title,
company_name=company_name,
location=location_obj,
job_url=job_url,
)
def _extract_job_url(self, job_general_information: BeautifulSoup) -> str | None:
"""
Pulls the job URL from the 'a' within the h2 element.
"""
a_tag = job_general_information.find("a")
if a_tag and a_tag.has_attr("href"):
return self.base_url + a_tag["href"].strip()

36
jobspy/exception.py Normal file
View File

@@ -0,0 +1,36 @@
"""
jobspy.jobboard.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of Scrapers' exceptions.
"""
class LinkedInException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with LinkedIn")
class IndeedException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with Indeed")
class ZipRecruiterException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with ZipRecruiter")
class GlassdoorException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with Glassdoor")
class GoogleJobsException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with Google Jobs")
class BaytException(Exception):
def __init__(self, message=None):
super().__init__(message or "An error occurred with Bayt")

View File

@@ -0,0 +1,320 @@
from __future__ import annotations
import re
import json
import requests
from typing import Tuple
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from jobspy.glassdoor.constant import fallback_token, query_template, headers
from jobspy.glassdoor.util import (
get_cursor_for_page,
parse_compensation,
parse_location,
)
from jobspy.util import (
extract_emails_from_text,
create_logger,
create_session,
markdown_converter,
)
from jobspy.exception import GlassdoorException
from jobspy.model import (
JobPost,
JobResponse,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
log = create_logger("Glassdoor")
class Glassdoor(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes GlassdoorScraper with the Glassdoor job search url
"""
site = Site(Site.GLASSDOOR)
super().__init__(site, proxies=proxies, ca_cert=ca_cert)
self.base_url = None
self.country = None
self.session = None
self.scraper_input = None
self.jobs_per_page = 30
self.max_pages = 30
self.seen_urls = set()
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Glassdoor for jobs with scraper_input criteria.
:param scraper_input: Information about job search criteria.
:return: JobResponse containing a list of jobs.
"""
self.scraper_input = scraper_input
self.scraper_input.results_wanted = min(900, scraper_input.results_wanted)
self.base_url = self.scraper_input.country.get_glassdoor_url()
self.session = create_session(
proxies=self.proxies, ca_cert=self.ca_cert, has_retry=True
)
token = self._get_csrf_token()
headers["gd-csrf-token"] = token if token else fallback_token
self.session.headers.update(headers)
location_id, location_type = self._get_location(
scraper_input.location, scraper_input.is_remote
)
if location_type is None:
log.error("Glassdoor: location not parsed")
return JobResponse(jobs=[])
job_list: list[JobPost] = []
cursor = None
range_start = 1 + (scraper_input.offset // self.jobs_per_page)
tot_pages = (scraper_input.results_wanted // self.jobs_per_page) + 2
range_end = min(tot_pages, self.max_pages + 1)
for page in range(range_start, range_end):
log.info(f"search page: {page} / {range_end - 1}")
try:
jobs, cursor = self._fetch_jobs_page(
scraper_input, location_id, location_type, page, cursor
)
job_list.extend(jobs)
if not jobs or len(job_list) >= scraper_input.results_wanted:
job_list = job_list[: scraper_input.results_wanted]
break
except Exception as e:
log.error(f"Glassdoor: {str(e)}")
break
return JobResponse(jobs=job_list)
def _fetch_jobs_page(
self,
scraper_input: ScraperInput,
location_id: int,
location_type: str,
page_num: int,
cursor: str | None,
) -> Tuple[list[JobPost], str | None]:
"""
Scrapes a page of Glassdoor for jobs with scraper_input criteria
"""
jobs = []
self.scraper_input = scraper_input
try:
payload = self._add_payload(location_id, location_type, page_num, cursor)
response = self.session.post(
f"{self.base_url}/graph",
timeout_seconds=15,
data=payload,
)
if response.status_code != 200:
exc_msg = f"bad response status code: {response.status_code}"
raise GlassdoorException(exc_msg)
res_json = response.json()[0]
if "errors" in res_json:
raise ValueError("Error encountered in API response")
except (
requests.exceptions.ReadTimeout,
GlassdoorException,
ValueError,
Exception,
) as e:
log.error(f"Glassdoor: {str(e)}")
return jobs, None
jobs_data = res_json["data"]["jobListings"]["jobListings"]
with ThreadPoolExecutor(max_workers=self.jobs_per_page) as executor:
future_to_job_data = {
executor.submit(self._process_job, job): job for job in jobs_data
}
for future in as_completed(future_to_job_data):
try:
job_post = future.result()
if job_post:
jobs.append(job_post)
except Exception as exc:
raise GlassdoorException(f"Glassdoor generated an exception: {exc}")
return jobs, get_cursor_for_page(
res_json["data"]["jobListings"]["paginationCursors"], page_num + 1
)
def _get_csrf_token(self):
"""
Fetches csrf token needed for API by visiting a generic page
"""
res = self.session.get(f"{self.base_url}/Job/computer-science-jobs.htm")
pattern = r'"token":\s*"([^"]+)"'
matches = re.findall(pattern, res.text)
token = None
if matches:
token = matches[0]
return token
def _process_job(self, job_data):
"""
Processes a single job and fetches its description.
"""
job_id = job_data["jobview"]["job"]["listingId"]
job_url = f"{self.base_url}job-listing/j?jl={job_id}"
if job_url in self.seen_urls:
return None
self.seen_urls.add(job_url)
job = job_data["jobview"]
title = job["job"]["jobTitleText"]
company_name = job["header"]["employerNameFromSearch"]
company_id = job_data["jobview"]["header"]["employer"]["id"]
location_name = job["header"].get("locationName", "")
location_type = job["header"].get("locationType", "")
age_in_days = job["header"].get("ageInDays")
is_remote, location = False, None
date_diff = (datetime.now() - timedelta(days=age_in_days)).date()
date_posted = date_diff if age_in_days is not None else None
if location_type == "S":
is_remote = True
else:
location = parse_location(location_name)
compensation = parse_compensation(job["header"])
try:
description = self._fetch_job_description(job_id)
except:
description = None
company_url = f"{self.base_url}Overview/W-EI_IE{company_id}.htm"
company_logo = (
job_data["jobview"].get("overview", {}).get("squareLogoUrl", None)
)
listing_type = (
job_data["jobview"]
.get("header", {})
.get("adOrderSponsorshipLevel", "")
.lower()
)
return JobPost(
id=f"gd-{job_id}",
title=title,
company_url=company_url if company_id else None,
company_name=company_name,
date_posted=date_posted,
job_url=job_url,
location=location,
compensation=compensation,
is_remote=is_remote,
description=description,
emails=extract_emails_from_text(description) if description else None,
company_logo=company_logo,
listing_type=listing_type,
)
def _fetch_job_description(self, job_id):
"""
Fetches the job description for a single job ID.
"""
url = f"{self.base_url}/graph"
body = [
{
"operationName": "JobDetailQuery",
"variables": {
"jl": job_id,
"queryString": "q",
"pageTypeEnum": "SERP",
},
"query": """
query JobDetailQuery($jl: Long!, $queryString: String, $pageTypeEnum: PageTypeEnum) {
jobview: jobView(
listingId: $jl
contextHolder: {queryString: $queryString, pageTypeEnum: $pageTypeEnum}
) {
job {
description
__typename
}
__typename
}
}
""",
}
]
res = requests.post(url, json=body, headers=headers)
if res.status_code != 200:
return None
data = res.json()[0]
desc = data["data"]["jobview"]["job"]["description"]
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
desc = markdown_converter(desc)
return desc
def _get_location(self, location: str, is_remote: bool) -> (int, str):
if not location or is_remote:
return "11047", "STATE" # remote options
url = f"{self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
res = self.session.get(url)
if res.status_code != 200:
if res.status_code == 429:
err = f"429 Response - Blocked by Glassdoor for too many requests"
log.error(err)
return None, None
else:
err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}"
log.error(f"Glassdoor response status code {res.status_code}")
return None, None
items = res.json()
if not items:
raise ValueError(f"Location '{location}' not found on Glassdoor")
location_type = items[0]["locationType"]
if location_type == "C":
location_type = "CITY"
elif location_type == "S":
location_type = "STATE"
elif location_type == "N":
location_type = "COUNTRY"
return int(items[0]["locationId"]), location_type
def _add_payload(
self,
location_id: int,
location_type: str,
page_num: int,
cursor: str | None = None,
) -> str:
fromage = None
if self.scraper_input.hours_old:
fromage = max(self.scraper_input.hours_old // 24, 1)
filter_params = []
if self.scraper_input.easy_apply:
filter_params.append({"filterKey": "applicationType", "values": "1"})
if fromage:
filter_params.append({"filterKey": "fromAge", "values": str(fromage)})
payload = {
"operationName": "JobSearchResultsQuery",
"variables": {
"excludeJobListingIds": [],
"filterParams": filter_params,
"keyword": self.scraper_input.search_term,
"numJobsToShow": 30,
"locationType": location_type,
"locationId": int(location_id),
"parameterUrlInput": f"IL.0,12_I{location_type}{location_id}",
"pageNumber": page_num,
"pageCursor": cursor,
"fromage": fromage,
"sort": "date",
},
"query": query_template,
}
if self.scraper_input.job_type:
payload["variables"]["filterParams"].append(
{"filterKey": "jobType", "values": self.scraper_input.job_type.value[0]}
)
return json.dumps([payload])

View File

@@ -0,0 +1,184 @@
headers = {
"authority": "www.glassdoor.com",
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"apollographql-client-name": "job-search-next",
"apollographql-client-version": "4.65.5",
"content-type": "application/json",
"origin": "https://www.glassdoor.com",
"referer": "https://www.glassdoor.com/",
"sec-ch-ua": '"Chromium";v="118", "Google Chrome";v="118", "Not=A?Brand";v="99"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-platform": '"macOS"',
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36",
}
query_template = """
query JobSearchResultsQuery(
$excludeJobListingIds: [Long!],
$keyword: String,
$locationId: Int,
$locationType: LocationTypeEnum,
$numJobsToShow: Int!,
$pageCursor: String,
$pageNumber: Int,
$filterParams: [FilterParams],
$originalPageUrl: String,
$seoFriendlyUrlInput: String,
$parameterUrlInput: String,
$seoUrl: Boolean
) {
jobListings(
contextHolder: {
searchParams: {
excludeJobListingIds: $excludeJobListingIds,
keyword: $keyword,
locationId: $locationId,
locationType: $locationType,
numPerPage: $numJobsToShow,
pageCursor: $pageCursor,
pageNumber: $pageNumber,
filterParams: $filterParams,
originalPageUrl: $originalPageUrl,
seoFriendlyUrlInput: $seoFriendlyUrlInput,
parameterUrlInput: $parameterUrlInput,
seoUrl: $seoUrl,
searchType: SR
}
}
) {
companyFilterOptions {
id
shortName
__typename
}
filterOptions
indeedCtk
jobListings {
...JobView
__typename
}
jobListingSeoLinks {
linkItems {
position
url
__typename
}
__typename
}
jobSearchTrackingKey
jobsPageSeoData {
pageMetaDescription
pageTitle
__typename
}
paginationCursors {
cursor
pageNumber
__typename
}
indexablePageForSeo
searchResultsMetadata {
searchCriteria {
implicitLocation {
id
localizedDisplayName
type
__typename
}
keyword
location {
id
shortName
localizedShortName
localizedDisplayName
type
__typename
}
__typename
}
helpCenterDomain
helpCenterLocale
jobSerpJobOutlook {
occupation
paragraph
__typename
}
showMachineReadableJobs
__typename
}
totalJobsCount
__typename
}
}
fragment JobView on JobListingSearchResult {
jobview {
header {
adOrderId
advertiserType
adOrderSponsorshipLevel
ageInDays
divisionEmployerName
easyApply
employer {
id
name
shortName
__typename
}
employerNameFromSearch
goc
gocConfidence
gocId
jobCountryId
jobLink
jobResultTrackingKey
jobTitleText
locationName
locationType
locId
needsCommission
payCurrency
payPeriod
payPeriodAdjustedPay {
p10
p50
p90
__typename
}
rating
salarySource
savedJobId
sponsored
__typename
}
job {
description
importConfigId
jobTitleId
jobTitleText
listingId
__typename
}
jobListingAdminDetails {
cpcVal
importConfigId
jobListingId
jobSourceId
userEligibleForAdminJobDetails
__typename
}
overview {
shortName
squareLogoUrl
__typename
}
__typename
}
__typename
}
"""
fallback_token = "Ft6oHEWlRZrxDww95Cpazw:0pGUrkb2y3TyOpAIqF2vbPmUXoXVkD3oEGDVkvfeCerceQ5-n8mBg3BovySUIjmCPHCaW0H2nQVdqzbtsYqf4Q:wcqRqeegRUa9MVLJGyujVXB7vWFPjdaS1CtrrzJq-ok"

42
jobspy/glassdoor/util.py Normal file
View File

@@ -0,0 +1,42 @@
from jobspy.model import Compensation, CompensationInterval, Location, JobType
def parse_compensation(data: dict) -> Compensation | None:
pay_period = data.get("payPeriod")
adjusted_pay = data.get("payPeriodAdjustedPay")
currency = data.get("payCurrency", "USD")
if not pay_period or not adjusted_pay:
return None
interval = None
if pay_period == "ANNUAL":
interval = CompensationInterval.YEARLY
elif pay_period:
interval = CompensationInterval.get_interval(pay_period)
min_amount = int(adjusted_pay.get("p10") // 1)
max_amount = int(adjusted_pay.get("p90") // 1)
return Compensation(
interval=interval,
min_amount=min_amount,
max_amount=max_amount,
currency=currency,
)
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
def parse_location(location_name: str) -> Location | None:
if not location_name or location_name == "Remote":
return
city, _, state = location_name.partition(", ")
return Location(city=city, state=state)
def get_cursor_for_page(pagination_cursors, page_num):
for cursor_data in pagination_cursors:
if cursor_data["pageNumber"] == page_num:
return cursor_data["cursor"]

202
jobspy/google/__init__.py Normal file
View File

@@ -0,0 +1,202 @@
from __future__ import annotations
import math
import re
import json
from typing import Tuple
from datetime import datetime, timedelta
from jobspy.google.constant import headers_jobs, headers_initial, async_param
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
JobResponse,
Location,
JobType,
)
from jobspy.util import extract_emails_from_text, extract_job_type, create_session
from jobspy.google.util import log, find_job_info_initial_page, find_job_info
class Google(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes Google Scraper with the Goodle jobs search url
"""
site = Site(Site.GOOGLE)
super().__init__(site, proxies=proxies, ca_cert=ca_cert)
self.country = None
self.session = None
self.scraper_input = None
self.jobs_per_page = 10
self.seen_urls = set()
self.url = "https://www.google.com/search"
self.jobs_url = "https://www.google.com/async/callback:550"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Google for jobs with scraper_input criteria.
:param scraper_input: Information about job search criteria.
:return: JobResponse containing a list of jobs.
"""
self.scraper_input = scraper_input
self.scraper_input.results_wanted = min(900, scraper_input.results_wanted)
self.session = create_session(
proxies=self.proxies, ca_cert=self.ca_cert, is_tls=False, has_retry=True
)
forward_cursor, job_list = self._get_initial_cursor_and_jobs()
if forward_cursor is None:
log.warning(
"initial cursor not found, try changing your query or there was at most 10 results"
)
return JobResponse(jobs=job_list)
page = 1
while (
len(self.seen_urls) < scraper_input.results_wanted + scraper_input.offset
and forward_cursor
):
log.info(
f"search page: {page} / {math.ceil(scraper_input.results_wanted / self.jobs_per_page)}"
)
try:
jobs, forward_cursor = self._get_jobs_next_page(forward_cursor)
except Exception as e:
log.error(f"failed to get jobs on page: {page}, {e}")
break
if not jobs:
log.info(f"found no jobs on page: {page}")
break
job_list += jobs
page += 1
return JobResponse(
jobs=job_list[
scraper_input.offset : scraper_input.offset
+ scraper_input.results_wanted
]
)
def _get_initial_cursor_and_jobs(self) -> Tuple[str, list[JobPost]]:
"""Gets initial cursor and jobs to paginate through job listings"""
query = f"{self.scraper_input.search_term} jobs"
def get_time_range(hours_old):
if hours_old <= 24:
return "since yesterday"
elif hours_old <= 72:
return "in the last 3 days"
elif hours_old <= 168:
return "in the last week"
else:
return "in the last month"
job_type_mapping = {
JobType.FULL_TIME: "Full time",
JobType.PART_TIME: "Part time",
JobType.INTERNSHIP: "Internship",
JobType.CONTRACT: "Contract",
}
if self.scraper_input.job_type in job_type_mapping:
query += f" {job_type_mapping[self.scraper_input.job_type]}"
if self.scraper_input.location:
query += f" near {self.scraper_input.location}"
if self.scraper_input.hours_old:
time_filter = get_time_range(self.scraper_input.hours_old)
query += f" {time_filter}"
if self.scraper_input.is_remote:
query += " remote"
if self.scraper_input.google_search_term:
query = self.scraper_input.google_search_term
params = {"q": query, "udm": "8"}
response = self.session.get(self.url, headers=headers_initial, params=params)
pattern_fc = r'<div jsname="Yust4d"[^>]+data-async-fc="([^"]+)"'
match_fc = re.search(pattern_fc, response.text)
data_async_fc = match_fc.group(1) if match_fc else None
jobs_raw = find_job_info_initial_page(response.text)
jobs = []
for job_raw in jobs_raw:
job_post = self._parse_job(job_raw)
if job_post:
jobs.append(job_post)
return data_async_fc, jobs
def _get_jobs_next_page(self, forward_cursor: str) -> Tuple[list[JobPost], str]:
params = {"fc": [forward_cursor], "fcv": ["3"], "async": [async_param]}
response = self.session.get(self.jobs_url, headers=headers_jobs, params=params)
return self._parse_jobs(response.text)
def _parse_jobs(self, job_data: str) -> Tuple[list[JobPost], str]:
"""
Parses jobs on a page with next page cursor
"""
start_idx = job_data.find("[[[")
end_idx = job_data.rindex("]]]") + 3
s = job_data[start_idx:end_idx]
parsed = json.loads(s)[0]
pattern_fc = r'data-async-fc="([^"]+)"'
match_fc = re.search(pattern_fc, job_data)
data_async_fc = match_fc.group(1) if match_fc else None
jobs_on_page = []
for array in parsed:
_, job_data = array
if not job_data.startswith("[[["):
continue
job_d = json.loads(job_data)
job_info = find_job_info(job_d)
job_post = self._parse_job(job_info)
if job_post:
jobs_on_page.append(job_post)
return jobs_on_page, data_async_fc
def _parse_job(self, job_info: list):
job_url = job_info[3][0][0] if job_info[3] and job_info[3][0] else None
if job_url in self.seen_urls:
return
self.seen_urls.add(job_url)
title = job_info[0]
company_name = job_info[1]
location = city = job_info[2]
state = country = date_posted = None
if location and "," in location:
city, state, *country = [*map(lambda x: x.strip(), location.split(","))]
days_ago_str = job_info[12]
if type(days_ago_str) == str:
match = re.search(r"\d+", days_ago_str)
days_ago = int(match.group()) if match else None
date_posted = (datetime.now() - timedelta(days=days_ago)).date()
description = job_info[19]
job_post = JobPost(
id=f"go-{job_info[28]}",
title=title,
company_name=company_name,
location=Location(
city=city, state=state, country=country[0] if country else None
),
job_url=job_url,
date_posted=date_posted,
is_remote="remote" in description.lower() or "wfh" in description.lower(),
description=description,
emails=extract_emails_from_text(description),
job_type=extract_job_type(description),
)
return job_post

52
jobspy/google/constant.py Normal file
View File

@@ -0,0 +1,52 @@
headers_initial = {
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9",
"priority": "u=0, i",
"referer": "https://www.google.com/",
"sec-ch-prefers-color-scheme": "dark",
"sec-ch-ua": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
"sec-ch-ua-arch": '"arm"',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-form-factors": '"Desktop"',
"sec-ch-ua-full-version": '"130.0.6723.58"',
"sec-ch-ua-full-version-list": '"Chromium";v="130.0.6723.58", "Google Chrome";v="130.0.6723.58", "Not?A_Brand";v="99.0.0.0"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-model": '""',
"sec-ch-ua-platform": '"macOS"',
"sec-ch-ua-platform-version": '"15.0.1"',
"sec-ch-ua-wow64": "?0",
"sec-fetch-dest": "document",
"sec-fetch-mode": "navigate",
"sec-fetch-site": "same-origin",
"sec-fetch-user": "?1",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
"x-browser-channel": "stable",
"x-browser-copyright": "Copyright 2024 Google LLC. All rights reserved.",
"x-browser-year": "2024",
}
headers_jobs = {
"accept": "*/*",
"accept-language": "en-US,en;q=0.9",
"priority": "u=1, i",
"referer": "https://www.google.com/",
"sec-ch-prefers-color-scheme": "dark",
"sec-ch-ua": '"Chromium";v="130", "Google Chrome";v="130", "Not?A_Brand";v="99"',
"sec-ch-ua-arch": '"arm"',
"sec-ch-ua-bitness": '"64"',
"sec-ch-ua-form-factors": '"Desktop"',
"sec-ch-ua-full-version": '"130.0.6723.58"',
"sec-ch-ua-full-version-list": '"Chromium";v="130.0.6723.58", "Google Chrome";v="130.0.6723.58", "Not?A_Brand";v="99.0.0.0"',
"sec-ch-ua-mobile": "?0",
"sec-ch-ua-model": '""',
"sec-ch-ua-platform": '"macOS"',
"sec-ch-ua-platform-version": '"15.0.1"',
"sec-ch-ua-wow64": "?0",
"sec-fetch-dest": "empty",
"sec-fetch-mode": "cors",
"sec-fetch-site": "same-origin",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/130.0.0.0 Safari/537.36",
}
async_param = "_basejs:/xjs/_/js/k=xjs.s.en_US.JwveA-JiKmg.2018.O/am=AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAIAAAAAAAAACAAAoICAAAAAAAKMAfAAAAIAQAAAAAAAAAAAAACCAAAEJDAAACAAAAAGABAIAAARBAAABAAAAAgAgQAABAASKAfv8JAAABAAAAAAwAQAQACQAAAAAAcAEAQABoCAAAABAAAIABAACAAAAEAAAAFAAAAAAAAAAAAAAAAAAAAAAAAACAQADoBwAAAAAAAAAAAAAQBAAAAATQAAoACOAHAAAAAAAAAQAAAIIAAAA_ZAACAAAAAAAAcB8APB4wHFJ4AAAAAAAAAAAAAAAACECCYA5If0EACAAAAAAAAAAAAAAAAAAAUgRNXG4AMAE/dg=0/br=1/rs=ACT90oGxMeaFMCopIHq5tuQM-6_3M_VMjQ,_basecss:/xjs/_/ss/k=xjs.s.IwsGu62EDtU.L.B1.O/am=QOoQIAQAAAQAREADEBAAAAAAAAAAAAAAAAAAAAAgAQAAIAAAgAQAAAIAIAIAoEwCAADIC8AfsgEAawwAPkAAjgoAGAAAAAAAAEADAAAAAAIgAECHAAAAAAAAAAABAQAggAARQAAAQCEAAAAAIAAAABgAAAAAIAQIACCAAfB-AAFIQABoCEA_CgEAAIABAACEgHAEwwAEFQAM4CgAAAAAAAAAAAAACABCAAAAQEAAABAgAMCPAAA4AoE2BAEAggSAAIoAQAAAAAgAAAAACCAQAAAxEwA_ZAACAAAAAAAAAAkAAAAAAAAgAAAAAAAAAAAAAAAAAAAAAAAAQAEAAAAAAAAAAAAAAAAAAAAAQA/br=1/rs=ACT90oGZc36t3uUQkj0srnIvvbHjO2hgyg,_basecomb:/xjs/_/js/k=xjs.s.en_US.JwveA-JiKmg.2018.O/ck=xjs.s.IwsGu62EDtU.L.B1.O/am=QOoQIAQAAAQAREADEBAAAAAAAAAAAAAAAAAAAAAgAQAAIAAAgAQAAAKAIAoIqEwCAADIK8AfsgEAawwAPkAAjgoAGAAACCAAAEJDAAACAAIgAGCHAIAAARBAAABBAQAggAgRQABAQSOAfv8JIAABABgAAAwAYAQICSCAAfB-cAFIQABoCEA_ChEAAIABAACEgHAEwwAEFQAM4CgAAAAAAAAAAAAACABCAACAQEDoBxAgAMCPAAA4AoE2BAEAggTQAIoASOAHAAgAAAAACSAQAIIxEwA_ZAACAAAAAAAAcB8APB4wHFJ4AAAAAAAAAAAAAAAACECCYA5If0EACAAAAAAAAAAAAAAAAAAAUgRNXG4AMAE/d=1/ed=1/dg=0/br=1/ujg=1/rs=ACT90oFNLTjPzD_OAqhhtXwe2pg1T3WpBg,_fmt:prog,_id:fc_5FwaZ86OKsfdwN4P4La3yA4_2"

41
jobspy/google/util.py Normal file
View File

@@ -0,0 +1,41 @@
import re
from jobspy.util import create_logger
log = create_logger("Google")
def find_job_info(jobs_data: list | dict) -> list | None:
"""Iterates through the JSON data to find the job listings"""
if isinstance(jobs_data, dict):
for key, value in jobs_data.items():
if key == "520084652" and isinstance(value, list):
return value
else:
result = find_job_info(value)
if result:
return result
elif isinstance(jobs_data, list):
for item in jobs_data:
result = find_job_info(item)
if result:
return result
return None
def find_job_info_initial_page(html_text: str):
pattern = f'520084652":(' + r"\[.*?\]\s*])\s*}\s*]\s*]\s*]\s*]\s*]"
results = []
matches = re.finditer(pattern, html_text)
import json
for match in matches:
try:
parsed_data = json.loads(match.group(1))
results.append(parsed_data)
except json.JSONDecodeError as e:
log.error(f"Failed to parse match: {str(e)}")
results.append({"raw_match": match.group(0), "error": str(e)})
return results

260
jobspy/indeed/__init__.py Normal file
View File

@@ -0,0 +1,260 @@
from __future__ import annotations
import math
from datetime import datetime
from typing import Tuple
from jobspy.indeed.constant import job_search_query, api_headers
from jobspy.indeed.util import is_job_remote, get_compensation, get_job_type
from jobspy.model import (
Scraper,
ScraperInput,
Site,
JobPost,
Location,
JobResponse,
JobType,
DescriptionFormat,
)
from jobspy.util import (
extract_emails_from_text,
markdown_converter,
create_session,
create_logger,
)
log = create_logger("Indeed")
class Indeed(Scraper):
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes IndeedScraper with the Indeed API url
"""
super().__init__(Site.INDEED, proxies=proxies)
self.session = create_session(
proxies=self.proxies, ca_cert=ca_cert, is_tls=False
)
self.scraper_input = None
self.jobs_per_page = 100
self.num_workers = 10
self.seen_urls = set()
self.headers = None
self.api_country_code = None
self.base_url = None
self.api_url = "https://apis.indeed.com/graphql"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Indeed for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
self.scraper_input = scraper_input
domain, self.api_country_code = self.scraper_input.country.indeed_domain_value
self.base_url = f"https://{domain}.indeed.com"
self.headers = api_headers.copy()
self.headers["indeed-co"] = self.scraper_input.country.indeed_domain_value
job_list = []
page = 1
cursor = None
while len(self.seen_urls) < scraper_input.results_wanted + scraper_input.offset:
log.info(
f"search page: {page} / {math.ceil(scraper_input.results_wanted / self.jobs_per_page)}"
)
jobs, cursor = self._scrape_page(cursor)
if not jobs:
log.info(f"found no jobs on page: {page}")
break
job_list += jobs
page += 1
return JobResponse(
jobs=job_list[
scraper_input.offset : scraper_input.offset
+ scraper_input.results_wanted
]
)
def _scrape_page(self, cursor: str | None) -> Tuple[list[JobPost], str | None]:
"""
Scrapes a page of Indeed for jobs with scraper_input criteria
:param cursor:
:return: jobs found on page, next page cursor
"""
jobs = []
new_cursor = None
filters = self._build_filters()
search_term = (
self.scraper_input.search_term.replace('"', '\\"')
if self.scraper_input.search_term
else ""
)
query = job_search_query.format(
what=(f'what: "{search_term}"' if search_term else ""),
location=(
f'location: {{where: "{self.scraper_input.location}", radius: {self.scraper_input.distance}, radiusUnit: MILES}}'
if self.scraper_input.location
else ""
),
dateOnIndeed=self.scraper_input.hours_old,
cursor=f'cursor: "{cursor}"' if cursor else "",
filters=filters,
)
payload = {
"query": query,
}
api_headers_temp = api_headers.copy()
api_headers_temp["indeed-co"] = self.api_country_code
response = self.session.post(
self.api_url,
headers=api_headers_temp,
json=payload,
timeout=10,
verify=False,
)
if not response.ok:
log.info(
f"responded with status code: {response.status_code} (submit GitHub issue if this appears to be a bug)"
)
return jobs, new_cursor
data = response.json()
jobs = data["data"]["jobSearch"]["results"]
new_cursor = data["data"]["jobSearch"]["pageInfo"]["nextCursor"]
job_list = []
for job in jobs:
processed_job = self._process_job(job["job"])
if processed_job:
job_list.append(processed_job)
return job_list, new_cursor
def _build_filters(self):
"""
Builds the filters dict for job type/is_remote. If hours_old is provided, composite filter for job_type/is_remote is not possible.
IndeedApply: filters: { keyword: { field: "indeedApplyScope", keys: ["DESKTOP"] } }
"""
filters_str = ""
if self.scraper_input.hours_old:
filters_str = """
filters: {{
date: {{
field: "dateOnIndeed",
start: "{start}h"
}}
}}
""".format(
start=self.scraper_input.hours_old
)
elif self.scraper_input.easy_apply:
filters_str = """
filters: {
keyword: {
field: "indeedApplyScope",
keys: ["DESKTOP"]
}
}
"""
elif self.scraper_input.job_type or self.scraper_input.is_remote:
job_type_key_mapping = {
JobType.FULL_TIME: "CF3CP",
JobType.PART_TIME: "75GKK",
JobType.CONTRACT: "NJXCK",
JobType.INTERNSHIP: "VDTG7",
}
keys = []
if self.scraper_input.job_type:
key = job_type_key_mapping[self.scraper_input.job_type]
keys.append(key)
if self.scraper_input.is_remote:
keys.append("DSQF7")
if keys:
keys_str = '", "'.join(keys)
filters_str = f"""
filters: {{
composite: {{
filters: [{{
keyword: {{
field: "attributes",
keys: ["{keys_str}"]
}}
}}]
}}
}}
"""
return filters_str
def _process_job(self, job: dict) -> JobPost | None:
"""
Parses the job dict into JobPost model
:param job: dict to parse
:return: JobPost if it's a new job
"""
job_url = f'{self.base_url}/viewjob?jk={job["key"]}'
if job_url in self.seen_urls:
return
self.seen_urls.add(job_url)
description = job["description"]["html"]
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description = markdown_converter(description)
job_type = get_job_type(job["attributes"])
timestamp_seconds = job["datePublished"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds).strftime("%Y-%m-%d")
employer = job["employer"].get("dossier") if job["employer"] else None
employer_details = employer.get("employerDetails", {}) if employer else {}
rel_url = job["employer"]["relativeCompanyPageUrl"] if job["employer"] else None
return JobPost(
id=f'in-{job["key"]}',
title=job["title"],
description=description,
company_name=job["employer"].get("name") if job.get("employer") else None,
company_url=(f"{self.base_url}{rel_url}" if job["employer"] else None),
company_url_direct=(
employer["links"]["corporateWebsite"] if employer else None
),
location=Location(
city=job.get("location", {}).get("city"),
state=job.get("location", {}).get("admin1Code"),
country=job.get("location", {}).get("countryCode"),
),
job_type=job_type,
compensation=get_compensation(job["compensation"]),
date_posted=date_posted,
job_url=job_url,
job_url_direct=(
job["recruit"].get("viewJobUrl") if job.get("recruit") else None
),
emails=extract_emails_from_text(description) if description else None,
is_remote=is_job_remote(job, description),
company_addresses=(
employer_details["addresses"][0]
if employer_details.get("addresses")
else None
),
company_industry=(
employer_details["industry"]
.replace("Iv1", "")
.replace("_", " ")
.title()
.strip()
if employer_details.get("industry")
else None
),
company_num_employees=employer_details.get("employeesLocalizedLabel"),
company_revenue=employer_details.get("revenueLocalizedLabel"),
company_description=employer_details.get("briefDescription"),
company_logo=(
employer["images"].get("squareLogoUrl")
if employer and employer.get("images")
else None
),
)

109
jobspy/indeed/constant.py Normal file
View File

@@ -0,0 +1,109 @@
job_search_query = """
query GetJobData {{
jobSearch(
{what}
{location}
limit: 100
{cursor}
sort: RELEVANCE
{filters}
) {{
pageInfo {{
nextCursor
}}
results {{
trackingKey
job {{
source {{
name
}}
key
title
datePublished
dateOnIndeed
description {{
html
}}
location {{
countryName
countryCode
admin1Code
city
postalCode
streetAddress
formatted {{
short
long
}}
}}
compensation {{
estimated {{
currencyCode
baseSalary {{
unitOfWork
range {{
... on Range {{
min
max
}}
}}
}}
}}
baseSalary {{
unitOfWork
range {{
... on Range {{
min
max
}}
}}
}}
currencyCode
}}
attributes {{
key
label
}}
employer {{
relativeCompanyPageUrl
name
dossier {{
employerDetails {{
addresses
industry
employeesLocalizedLabel
revenueLocalizedLabel
briefDescription
ceoName
ceoPhotoUrl
}}
images {{
headerImageUrl
squareLogoUrl
}}
links {{
corporateWebsite
}}
}}
}}
recruit {{
viewJobUrl
detailedSalary
workSchedule
}}
}}
}}
}}
}}
"""
api_headers = {
"Host": "apis.indeed.com",
"content-type": "application/json",
"indeed-api-key": "161092c2017b5bbab13edb12461a62d5a833871e7cad6d9d475304573de67ac8",
"accept": "application/json",
"indeed-locale": "en-US",
"accept-language": "en-US,en;q=0.9",
"user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 Indeed App 193.1",
"indeed-app-info": "appv=193.1; appid=com.indeed.jobsearch; osv=16.6.1; os=ios; dtype=phone",
}

83
jobspy/indeed/util.py Normal file
View File

@@ -0,0 +1,83 @@
from jobspy.model import CompensationInterval, JobType, Compensation
from jobspy.util import get_enum_from_job_type
def get_job_type(attributes: list) -> list[JobType]:
"""
Parses the attributes to get list of job types
:param attributes:
:return: list of JobType
"""
job_types: list[JobType] = []
for attribute in attributes:
job_type_str = attribute["label"].replace("-", "").replace(" ", "").lower()
job_type = get_enum_from_job_type(job_type_str)
if job_type:
job_types.append(job_type)
return job_types
def get_compensation(compensation: dict) -> Compensation | None:
"""
Parses the job to get compensation
:param sssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssssrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrrompensation:
:return: compensation object
"""
if not compensation["baseSalary"] and not compensation["estimated"]:
return None
comp = (
compensation["baseSalary"]
if compensation["baseSalary"]
else compensation["estimated"]["baseSalary"]
)
if not comp:
return None
interval = get_compensation_interval(comp["unitOfWork"])
if not interval:
return None
min_range = comp["range"].get("min")
max_range = comp["range"].get("max")
return Compensation(
interval=interval,
min_amount=int(min_range) if min_range is not None else None,
max_amount=int(max_range) if max_range is not None else None,
currency=(
compensation["estimated"]["currencyCode"]
if compensation["estimated"]
else compensation["currencyCode"]
),
)
def is_job_remote(job: dict, description: str) -> bool:
"""
Searches the description, location, and attributes to check if job is remote
"""
remote_keywords = ["remote", "work from home", "wfh"]
is_remote_in_attributes = any(
any(keyword in attr["label"].lower() for keyword in remote_keywords)
for attr in job["attributes"]
)
is_remote_in_description = any(
keyword in description.lower() for keyword in remote_keywords
)
is_remote_in_location = any(
keyword in job["location"]["formatted"]["long"].lower()
for keyword in remote_keywords
)
return is_remote_in_attributes or is_remote_in_description or is_remote_in_location
def get_compensation_interval(interval: str) -> CompensationInterval:
interval_mapping = {
"DAY": "DAILY",
"YEAR": "YEARLY",
"HOUR": "HOURLY",
"WEEK": "WEEKLY",
"MONTH": "MONTHLY",
}
mapped_interval = interval_mapping.get(interval.upper(), None)
if mapped_interval and mapped_interval in CompensationInterval.__members__:
return CompensationInterval[mapped_interval]
else:
raise ValueError(f"Unsupported interval: {interval}")

335
jobspy/linkedin/__init__.py Normal file
View File

@@ -0,0 +1,335 @@
from __future__ import annotations
import math
import random
import time
from datetime import datetime
from typing import Optional
from urllib.parse import urlparse, urlunparse, unquote
import regex as re
from bs4 import BeautifulSoup
from bs4.element import Tag
from jobspy.exception import LinkedInException
from jobspy.linkedin.constant import headers
from jobspy.linkedin.util import (
job_type_code,
parse_job_type,
parse_job_level,
parse_company_industry,
)
from jobspy.model import (
JobPost,
Location,
JobResponse,
Country,
Compensation,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
from jobspy.util import (
extract_emails_from_text,
currency_parser,
markdown_converter,
create_session,
remove_attributes,
create_logger,
)
log = create_logger("LinkedIn")
class LinkedIn(Scraper):
base_url = "https://www.linkedin.com"
delay = 3
band_delay = 4
jobs_per_page = 25
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes LinkedInScraper with the LinkedIn job search url
"""
super().__init__(Site.LINKEDIN, proxies=proxies, ca_cert=ca_cert)
self.session = create_session(
proxies=self.proxies,
ca_cert=ca_cert,
is_tls=False,
has_retry=True,
delay=5,
clear_cookies=True,
)
self.session.headers.update(headers)
self.scraper_input = None
self.country = "worldwide"
self.job_url_direct_regex = re.compile(r'(?<=\?url=)[^"]+')
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes LinkedIn for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
self.scraper_input = scraper_input
job_list: list[JobPost] = []
seen_ids = set()
start = scraper_input.offset // 10 * 10 if scraper_input.offset else 0
request_count = 0
seconds_old = (
scraper_input.hours_old * 3600 if scraper_input.hours_old else None
)
continue_search = (
lambda: len(job_list) < scraper_input.results_wanted and start < 1000
)
while continue_search():
request_count += 1
log.info(
f"search page: {request_count} / {math.ceil(scraper_input.results_wanted / 10)}"
)
params = {
"keywords": scraper_input.search_term,
"location": scraper_input.location,
"distance": scraper_input.distance,
"f_WT": 2 if scraper_input.is_remote else None,
"f_JT": (
job_type_code(scraper_input.job_type)
if scraper_input.job_type
else None
),
"pageNum": 0,
"start": start,
"f_AL": "true" if scraper_input.easy_apply else None,
"f_C": (
",".join(map(str, scraper_input.linkedin_company_ids))
if scraper_input.linkedin_company_ids
else None
),
}
if seconds_old is not None:
params["f_TPR"] = f"r{seconds_old}"
params = {k: v for k, v in params.items() if v is not None}
try:
response = self.session.get(
f"{self.base_url}/jobs-guest/jobs/api/seeMoreJobPostings/search?",
params=params,
timeout=10,
)
if response.status_code not in range(200, 400):
if response.status_code == 429:
err = (
f"429 Response - Blocked by LinkedIn for too many requests"
)
else:
err = f"LinkedIn response status code {response.status_code}"
err += f" - {response.text}"
log.error(err)
return JobResponse(jobs=job_list)
except Exception as e:
if "Proxy responded with" in str(e):
log.error(f"LinkedIn: Bad proxy")
else:
log.error(f"LinkedIn: {str(e)}")
return JobResponse(jobs=job_list)
soup = BeautifulSoup(response.text, "html.parser")
job_cards = soup.find_all("div", class_="base-search-card")
if len(job_cards) == 0:
return JobResponse(jobs=job_list)
for job_card in job_cards:
href_tag = job_card.find("a", class_="base-card__full-link")
if href_tag and "href" in href_tag.attrs:
href = href_tag.attrs["href"].split("?")[0]
job_id = href.split("-")[-1]
if job_id in seen_ids:
continue
seen_ids.add(job_id)
try:
fetch_desc = scraper_input.linkedin_fetch_description
job_post = self._process_job(job_card, job_id, fetch_desc)
if job_post:
job_list.append(job_post)
if not continue_search():
break
except Exception as e:
raise LinkedInException(str(e))
if continue_search():
time.sleep(random.uniform(self.delay, self.delay + self.band_delay))
start += len(job_list)
job_list = job_list[: scraper_input.results_wanted]
return JobResponse(jobs=job_list)
def _process_job(
self, job_card: Tag, job_id: str, full_descr: bool
) -> Optional[JobPost]:
salary_tag = job_card.find("span", class_="job-search-card__salary-info")
compensation = None
if salary_tag:
salary_text = salary_tag.get_text(separator=" ").strip()
salary_values = [currency_parser(value) for value in salary_text.split("-")]
salary_min = salary_values[0]
salary_max = salary_values[1]
currency = salary_text[0] if salary_text[0] != "$" else "USD"
compensation = Compensation(
min_amount=int(salary_min),
max_amount=int(salary_max),
currency=currency,
)
title_tag = job_card.find("span", class_="sr-only")
title = title_tag.get_text(strip=True) if title_tag else "N/A"
company_tag = job_card.find("h4", class_="base-search-card__subtitle")
company_a_tag = company_tag.find("a") if company_tag else None
company_url = (
urlunparse(urlparse(company_a_tag.get("href"))._replace(query=""))
if company_a_tag and company_a_tag.has_attr("href")
else ""
)
company = company_a_tag.get_text(strip=True) if company_a_tag else "N/A"
metadata_card = job_card.find("div", class_="base-search-card__metadata")
location = self._get_location(metadata_card)
datetime_tag = (
metadata_card.find("time", class_="job-search-card__listdate")
if metadata_card
else None
)
date_posted = None
if datetime_tag and "datetime" in datetime_tag.attrs:
datetime_str = datetime_tag["datetime"]
try:
date_posted = datetime.strptime(datetime_str, "%Y-%m-%d")
except:
date_posted = None
job_details = {}
if full_descr:
job_details = self._get_job_details(job_id)
return JobPost(
id=f"li-{job_id}",
title=title,
company_name=company,
company_url=company_url,
location=location,
date_posted=date_posted,
job_url=f"{self.base_url}/jobs/view/{job_id}",
compensation=compensation,
job_type=job_details.get("job_type"),
job_level=job_details.get("job_level", "").lower(),
company_industry=job_details.get("company_industry"),
description=job_details.get("description"),
job_url_direct=job_details.get("job_url_direct"),
emails=extract_emails_from_text(job_details.get("description")),
company_logo=job_details.get("company_logo"),
job_function=job_details.get("job_function"),
)
def _get_job_details(self, job_id: str) -> dict:
"""
Retrieves job description and other job details by going to the job page url
:param job_page_url:
:return: dict
"""
try:
response = self.session.get(
f"{self.base_url}/jobs/view/{job_id}", timeout=5
)
response.raise_for_status()
except:
return {}
if "linkedin.com/signup" in response.url:
return {}
soup = BeautifulSoup(response.text, "html.parser")
div_content = soup.find(
"div", class_=lambda x: x and "show-more-less-html__markup" in x
)
description = None
if div_content is not None:
div_content = remove_attributes(div_content)
description = div_content.prettify(formatter="html")
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description = markdown_converter(description)
h3_tag = soup.find(
"h3", text=lambda text: text and "Job function" in text.strip()
)
job_function = None
if h3_tag:
job_function_span = h3_tag.find_next(
"span", class_="description__job-criteria-text"
)
if job_function_span:
job_function = job_function_span.text.strip()
company_logo = (
logo_image.get("data-delayed-url")
if (logo_image := soup.find("img", {"class": "artdeco-entity-image"}))
else None
)
return {
"description": description,
"job_level": parse_job_level(soup),
"company_industry": parse_company_industry(soup),
"job_type": parse_job_type(soup),
"job_url_direct": self._parse_job_url_direct(soup),
"company_logo": company_logo,
"job_function": job_function,
}
def _get_location(self, metadata_card: Optional[Tag]) -> Location:
"""
Extracts the location data from the job metadata card.
:param metadata_card
:return: location
"""
location = Location(country=Country.from_string(self.country))
if metadata_card is not None:
location_tag = metadata_card.find(
"span", class_="job-search-card__location"
)
location_string = location_tag.text.strip() if location_tag else "N/A"
parts = location_string.split(", ")
if len(parts) == 2:
city, state = parts
location = Location(
city=city,
state=state,
country=Country.from_string(self.country),
)
elif len(parts) == 3:
city, state, country = parts
country = Country.from_string(country)
location = Location(city=city, state=state, country=country)
return location
def _parse_job_url_direct(self, soup: BeautifulSoup) -> str | None:
"""
Gets the job url direct from job page
:param soup:
:return: str
"""
job_url_direct = None
job_url_direct_content = soup.find("code", id="applyUrl")
if job_url_direct_content:
job_url_direct_match = self.job_url_direct_regex.search(
job_url_direct_content.decode_contents().strip()
)
if job_url_direct_match:
job_url_direct = unquote(job_url_direct_match.group())
return job_url_direct

View File

@@ -0,0 +1,8 @@
headers = {
"authority": "www.linkedin.com",
"accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7",
"accept-language": "en-US,en;q=0.9",
"cache-control": "max-age=0",
"upgrade-insecure-requests": "1",
"user-agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
}

85
jobspy/linkedin/util.py Normal file
View File

@@ -0,0 +1,85 @@
from bs4 import BeautifulSoup
from jobspy.model import JobType
from jobspy.util import get_enum_from_job_type
def job_type_code(job_type_enum: JobType) -> str:
return {
JobType.FULL_TIME: "F",
JobType.PART_TIME: "P",
JobType.INTERNSHIP: "I",
JobType.CONTRACT: "C",
JobType.TEMPORARY: "T",
}.get(job_type_enum, "")
def parse_job_type(soup_job_type: BeautifulSoup) -> list[JobType] | None:
"""
Gets the job type from job page
:param soup_job_type:
:return: JobType
"""
h3_tag = soup_job_type.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Employment type" in text,
)
employment_type = None
if h3_tag:
employment_type_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if employment_type_span:
employment_type = employment_type_span.get_text(strip=True)
employment_type = employment_type.lower()
employment_type = employment_type.replace("-", "")
return [get_enum_from_job_type(employment_type)] if employment_type else []
def parse_job_level(soup_job_level: BeautifulSoup) -> str | None:
"""
Gets the job level from job page
:param soup_job_level:
:return: str
"""
h3_tag = soup_job_level.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Seniority level" in text,
)
job_level = None
if h3_tag:
job_level_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if job_level_span:
job_level = job_level_span.get_text(strip=True)
return job_level
def parse_company_industry(soup_industry: BeautifulSoup) -> str | None:
"""
Gets the company industry from job page
:param soup_industry:
:return: str
"""
h3_tag = soup_industry.find(
"h3",
class_="description__job-criteria-subheader",
string=lambda text: "Industries" in text,
)
industry = None
if h3_tag:
industry_span = h3_tag.find_next_sibling(
"span",
class_="description__job-criteria-text description__job-criteria-text--criteria",
)
if industry_span:
industry = industry_span.get_text(strip=True)
return industry

314
jobspy/model.py Normal file
View File

@@ -0,0 +1,314 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import Optional
from datetime import date
from enum import Enum
from pydantic import BaseModel
class JobType(Enum):
FULL_TIME = (
"fulltime",
"períodointegral",
"estágio/trainee",
"cunormăîntreagă",
"tiempocompleto",
"vollzeit",
"voltijds",
"tempointegral",
"全职",
"plnýúvazek",
"fuldtid",
"دوامكامل",
"kokopäivätyö",
"tempsplein",
"vollzeit",
"πλήρηςαπασχόληση",
"teljesmunkaidő",
"tempopieno",
"tempsplein",
"heltid",
"jornadacompleta",
"pełnyetat",
"정규직",
"100%",
"全職",
"งานประจำ",
"tamzamanlı",
"повназайнятість",
"toànthờigian",
)
PART_TIME = ("parttime", "teilzeit", "částečnýúvazek", "deltid")
CONTRACT = ("contract", "contractor")
TEMPORARY = ("temporary",)
INTERNSHIP = (
"internship",
"prácticas",
"ojt(onthejobtraining)",
"praktikum",
"praktik",
)
PER_DIEM = ("perdiem",)
NIGHTS = ("nights",)
OTHER = ("other",)
SUMMER = ("summer",)
VOLUNTEER = ("volunteer",)
class Country(Enum):
"""
Gets the subdomain for Indeed and Glassdoor.
The second item in the tuple is the subdomain (and API country code if there's a ':' separator) for Indeed
The third item in the tuple is the subdomain (and tld if there's a ':' separator) for Glassdoor
"""
ARGENTINA = ("argentina", "ar", "com.ar")
AUSTRALIA = ("australia", "au", "com.au")
AUSTRIA = ("austria", "at", "at")
BAHRAIN = ("bahrain", "bh")
BELGIUM = ("belgium", "be", "fr:be")
BRAZIL = ("brazil", "br", "com.br")
CANADA = ("canada", "ca", "ca")
CHILE = ("chile", "cl")
CHINA = ("china", "cn")
COLOMBIA = ("colombia", "co")
COSTARICA = ("costa rica", "cr")
CZECHREPUBLIC = ("czech republic,czechia", "cz")
DENMARK = ("denmark", "dk")
ECUADOR = ("ecuador", "ec")
EGYPT = ("egypt", "eg")
FINLAND = ("finland", "fi")
FRANCE = ("france", "fr", "fr")
GERMANY = ("germany", "de", "de")
GREECE = ("greece", "gr")
HONGKONG = ("hong kong", "hk", "com.hk")
HUNGARY = ("hungary", "hu")
INDIA = ("india", "in", "co.in")
INDONESIA = ("indonesia", "id")
IRELAND = ("ireland", "ie", "ie")
ISRAEL = ("israel", "il")
ITALY = ("italy", "it", "it")
JAPAN = ("japan", "jp")
KUWAIT = ("kuwait", "kw")
LUXEMBOURG = ("luxembourg", "lu")
MALAYSIA = ("malaysia", "malaysia:my", "com")
MALTA = ("malta", "malta:mt", "mt")
MEXICO = ("mexico", "mx", "com.mx")
MOROCCO = ("morocco", "ma")
NETHERLANDS = ("netherlands", "nl", "nl")
NEWZEALAND = ("new zealand", "nz", "co.nz")
NIGERIA = ("nigeria", "ng")
NORWAY = ("norway", "no")
OMAN = ("oman", "om")
PAKISTAN = ("pakistan", "pk")
PANAMA = ("panama", "pa")
PERU = ("peru", "pe")
PHILIPPINES = ("philippines", "ph")
POLAND = ("poland", "pl")
PORTUGAL = ("portugal", "pt")
QATAR = ("qatar", "qa")
ROMANIA = ("romania", "ro")
SAUDIARABIA = ("saudi arabia", "sa")
SINGAPORE = ("singapore", "sg", "sg")
SOUTHAFRICA = ("south africa", "za")
SOUTHKOREA = ("south korea", "kr")
SPAIN = ("spain", "es", "es")
SWEDEN = ("sweden", "se")
SWITZERLAND = ("switzerland", "ch", "de:ch")
TAIWAN = ("taiwan", "tw")
THAILAND = ("thailand", "th")
TURKEY = ("türkiye,turkey", "tr")
UKRAINE = ("ukraine", "ua")
UNITEDARABEMIRATES = ("united arab emirates", "ae")
UK = ("uk,united kingdom", "uk:gb", "co.uk")
USA = ("usa,us,united states", "www:us", "com")
URUGUAY = ("uruguay", "uy")
VENEZUELA = ("venezuela", "ve")
VIETNAM = ("vietnam", "vn", "com")
# internal for ziprecruiter
US_CANADA = ("usa/ca", "www")
# internal for linkedin
WORLDWIDE = ("worldwide", "www")
@property
def indeed_domain_value(self):
subdomain, _, api_country_code = self.value[1].partition(":")
if subdomain and api_country_code:
return subdomain, api_country_code.upper()
return self.value[1], self.value[1].upper()
@property
def glassdoor_domain_value(self):
if len(self.value) == 3:
subdomain, _, domain = self.value[2].partition(":")
if subdomain and domain:
return f"{subdomain}.glassdoor.{domain}"
else:
return f"www.glassdoor.{self.value[2]}"
else:
raise Exception(f"Glassdoor is not available for {self.name}")
def get_glassdoor_url(self):
return f"https://{self.glassdoor_domain_value}/"
@classmethod
def from_string(cls, country_str: str):
"""Convert a string to the corresponding Country enum."""
country_str = country_str.strip().lower()
for country in cls:
country_names = country.value[0].split(",")
if country_str in country_names:
return country
valid_countries = [country.value for country in cls]
raise ValueError(
f"Invalid country string: '{country_str}'. Valid countries are: {', '.join([country[0] for country in valid_countries])}"
)
class Location(BaseModel):
country: Country | str | None = None
city: Optional[str] = None
state: Optional[str] = None
def display_location(self) -> str:
location_parts = []
if self.city:
location_parts.append(self.city)
if self.state:
location_parts.append(self.state)
if isinstance(self.country, str):
location_parts.append(self.country)
elif self.country and self.country not in (
Country.US_CANADA,
Country.WORLDWIDE,
):
country_name = self.country.value[0]
if "," in country_name:
country_name = country_name.split(",")[0]
if country_name in ("usa", "uk"):
location_parts.append(country_name.upper())
else:
location_parts.append(country_name.title())
return ", ".join(location_parts)
class CompensationInterval(Enum):
YEARLY = "yearly"
MONTHLY = "monthly"
WEEKLY = "weekly"
DAILY = "daily"
HOURLY = "hourly"
@classmethod
def get_interval(cls, pay_period):
interval_mapping = {
"YEAR": cls.YEARLY,
"HOUR": cls.HOURLY,
}
if pay_period in interval_mapping:
return interval_mapping[pay_period].value
else:
return cls[pay_period].value if pay_period in cls.__members__ else None
class Compensation(BaseModel):
interval: Optional[CompensationInterval] = None
min_amount: float | None = None
max_amount: float | None = None
currency: Optional[str] = "USD"
class DescriptionFormat(Enum):
MARKDOWN = "markdown"
HTML = "html"
class JobPost(BaseModel):
id: str | None = None
title: str
company_name: str | None
job_url: str
job_url_direct: str | None = None
location: Optional[Location]
description: str | None = None
company_url: str | None = None
company_url_direct: str | None = None
job_type: list[JobType] | None = None
compensation: Compensation | None = None
date_posted: date | None = None
emails: list[str] | None = None
is_remote: bool | None = None
listing_type: str | None = None
# linkedin specific
job_level: str | None = None
# linkedin and indeed specific
company_industry: str | None = None
# indeed specific
company_addresses: str | None = None
company_num_employees: str | None = None
company_revenue: str | None = None
company_description: str | None = None
company_logo: str | None = None
banner_photo_url: str | None = None
# linkedin only atm
job_function: str | None = None
class JobResponse(BaseModel):
jobs: list[JobPost] = []
class Site(Enum):
LINKEDIN = "linkedin"
INDEED = "indeed"
ZIP_RECRUITER = "zip_recruiter"
GLASSDOOR = "glassdoor"
GOOGLE = "google"
BAYT = "bayt"
class SalarySource(Enum):
DIRECT_DATA = "direct_data"
DESCRIPTION = "description"
class ScraperInput(BaseModel):
site_type: list[Site]
search_term: str | None = None
google_search_term: str | None = None
location: str | None = None
country: Country | None = Country.USA
distance: int | None = None
is_remote: bool = False
job_type: JobType | None = None
easy_apply: bool | None = None
offset: int = 0
linkedin_fetch_description: bool = False
linkedin_company_ids: list[int] | None = None
description_format: DescriptionFormat | None = DescriptionFormat.MARKDOWN
results_wanted: int = 15
hours_old: int | None = None
class Scraper(ABC):
def __init__(
self, site: Site, proxies: list[str] | None = None, ca_cert: str | None = None
):
self.site = site
self.proxies = proxies
self.ca_cert = ca_cert
@abstractmethod
def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...

347
jobspy/util.py Normal file
View File

@@ -0,0 +1,347 @@
from __future__ import annotations
import logging
import re
from itertools import cycle
import numpy as np
import requests
import tls_client
import urllib3
from markdownify import markdownify as md
from requests.adapters import HTTPAdapter, Retry
from jobspy.model import CompensationInterval, JobType, Site
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
def create_logger(name: str):
logger = logging.getLogger(f"JobSpy:{name}")
logger.propagate = False
if not logger.handlers:
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
formatter = logging.Formatter(format)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
class RotatingProxySession:
def __init__(self, proxies=None):
if isinstance(proxies, str):
self.proxy_cycle = cycle([self.format_proxy(proxies)])
elif isinstance(proxies, list):
self.proxy_cycle = (
cycle([self.format_proxy(proxy) for proxy in proxies])
if proxies
else None
)
else:
self.proxy_cycle = None
@staticmethod
def format_proxy(proxy):
"""Utility method to format a proxy string into a dictionary."""
if proxy.startswith("http://") or proxy.startswith("https://"):
return {"http": proxy, "https": proxy}
return {"http": f"http://{proxy}", "https": f"http://{proxy}"}
class RequestsRotating(RotatingProxySession, requests.Session):
def __init__(self, proxies=None, has_retry=False, delay=1, clear_cookies=False):
RotatingProxySession.__init__(self, proxies=proxies)
requests.Session.__init__(self)
self.clear_cookies = clear_cookies
self.allow_redirects = True
self.setup_session(has_retry, delay)
def setup_session(self, has_retry, delay):
if has_retry:
retries = Retry(
total=3,
connect=3,
status=3,
status_forcelist=[500, 502, 503, 504, 429],
backoff_factor=delay,
)
adapter = HTTPAdapter(max_retries=retries)
self.mount("http://", adapter)
self.mount("https://", adapter)
def request(self, method, url, **kwargs):
if self.clear_cookies:
self.cookies.clear()
if self.proxy_cycle:
next_proxy = next(self.proxy_cycle)
if next_proxy["http"] != "http://localhost":
self.proxies = next_proxy
else:
self.proxies = {}
return requests.Session.request(self, method, url, **kwargs)
class TLSRotating(RotatingProxySession, tls_client.Session):
def __init__(self, proxies=None):
RotatingProxySession.__init__(self, proxies=proxies)
tls_client.Session.__init__(self, random_tls_extension_order=True)
def execute_request(self, *args, **kwargs):
if self.proxy_cycle:
next_proxy = next(self.proxy_cycle)
if next_proxy["http"] != "http://localhost":
self.proxies = next_proxy
else:
self.proxies = {}
response = tls_client.Session.execute_request(self, *args, **kwargs)
response.ok = response.status_code in range(200, 400)
return response
def create_session(
*,
proxies: dict | str | None = None,
ca_cert: str | None = None,
is_tls: bool = True,
has_retry: bool = False,
delay: int = 1,
clear_cookies: bool = False,
) -> requests.Session:
"""
Creates a requests session with optional tls, proxy, and retry settings.
:return: A session object
"""
if is_tls:
session = TLSRotating(proxies=proxies)
else:
session = RequestsRotating(
proxies=proxies,
has_retry=has_retry,
delay=delay,
clear_cookies=clear_cookies,
)
if ca_cert:
session.verify = ca_cert
return session
def set_logger_level(verbose: int):
"""
Adjusts the logger's level. This function allows the logging level to be changed at runtime.
Parameters:
- verbose: int {0, 1, 2} (default=2, all logs)
"""
if verbose is None:
return
level_name = {2: "INFO", 1: "WARNING", 0: "ERROR"}.get(verbose, "INFO")
level = getattr(logging, level_name.upper(), None)
if level is not None:
for logger_name in logging.root.manager.loggerDict:
if logger_name.startswith("JobSpy:"):
logging.getLogger(logger_name).setLevel(level)
else:
raise ValueError(f"Invalid log level: {level_name}")
def markdown_converter(description_html: str):
if description_html is None:
return None
markdown = md(description_html)
return markdown.strip()
def extract_emails_from_text(text: str) -> list[str] | None:
if not text:
return None
email_regex = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
return email_regex.findall(text)
def get_enum_from_job_type(job_type_str: str) -> JobType | None:
"""
Given a string, returns the corresponding JobType enum member if a match is found.
"""
res = None
for job_type in JobType:
if job_type_str in job_type.value:
res = job_type
return res
def currency_parser(cur_str):
# Remove any non-numerical characters
# except for ',' '.' or '-' (e.g. EUR)
cur_str = re.sub("[^-0-9.,]", "", cur_str)
# Remove any 000s separators (either , or .)
cur_str = re.sub("[.,]", "", cur_str[:-3]) + cur_str[-3:]
if "." in list(cur_str[-3:]):
num = float(cur_str)
elif "," in list(cur_str[-3:]):
num = float(cur_str.replace(",", "."))
else:
num = float(cur_str)
return np.round(num, 2)
def remove_attributes(tag):
for attr in list(tag.attrs):
del tag[attr]
return tag
def extract_salary(
salary_str,
lower_limit=1000,
upper_limit=700000,
hourly_threshold=350,
monthly_threshold=30000,
enforce_annual_salary=False,
):
"""
Extracts salary information from a string and returns the salary interval, min and max salary values, and currency.
(TODO: Needs test cases as the regex is complicated and may not cover all edge cases)
"""
if not salary_str:
return None, None, None, None
annual_max_salary = None
min_max_pattern = r"\$(\d+(?:,\d+)?(?:\.\d+)?)([kK]?)\s*[-—–]\s*(?:\$)?(\d+(?:,\d+)?(?:\.\d+)?)([kK]?)"
def to_int(s):
return int(float(s.replace(",", "")))
def convert_hourly_to_annual(hourly_wage):
return hourly_wage * 2080
def convert_monthly_to_annual(monthly_wage):
return monthly_wage * 12
match = re.search(min_max_pattern, salary_str)
if match:
min_salary = to_int(match.group(1))
max_salary = to_int(match.group(3))
# Handle 'k' suffix for min and max salaries independently
if "k" in match.group(2).lower() or "k" in match.group(4).lower():
min_salary *= 1000
max_salary *= 1000
# Convert to annual if less than the hourly threshold
if min_salary < hourly_threshold:
interval = CompensationInterval.HOURLY.value
annual_min_salary = convert_hourly_to_annual(min_salary)
if max_salary < hourly_threshold:
annual_max_salary = convert_hourly_to_annual(max_salary)
elif min_salary < monthly_threshold:
interval = CompensationInterval.MONTHLY.value
annual_min_salary = convert_monthly_to_annual(min_salary)
if max_salary < monthly_threshold:
annual_max_salary = convert_monthly_to_annual(max_salary)
else:
interval = CompensationInterval.YEARLY.value
annual_min_salary = min_salary
annual_max_salary = max_salary
# Ensure salary range is within specified limits
if not annual_max_salary:
return None, None, None, None
if (
lower_limit <= annual_min_salary <= upper_limit
and lower_limit <= annual_max_salary <= upper_limit
and annual_min_salary < annual_max_salary
):
if enforce_annual_salary:
return interval, annual_min_salary, annual_max_salary, "USD"
else:
return interval, min_salary, max_salary, "USD"
return None, None, None, None
def extract_job_type(description: str):
if not description:
return []
keywords = {
JobType.FULL_TIME: r"full\s?time",
JobType.PART_TIME: r"part\s?time",
JobType.INTERNSHIP: r"internship",
JobType.CONTRACT: r"contract",
}
listing_types = []
for key, pattern in keywords.items():
if re.search(pattern, description, re.IGNORECASE):
listing_types.append(key)
return listing_types if listing_types else None
def map_str_to_site(site_name: str) -> Site:
return Site[site_name.upper()]
def get_enum_from_value(value_str):
for job_type in JobType:
if value_str in job_type.value:
return job_type
raise Exception(f"Invalid job type: {value_str}")
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
desired_order = [
"id",
"site",
"job_url",
"job_url_direct",
"title",
"company",
"location",
"date_posted",
"job_type",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]

View File

@@ -0,0 +1,219 @@
from __future__ import annotations
import json
import math
import re
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from bs4 import BeautifulSoup
from jobspy.ziprecruiter.constant import headers, get_cookie_data
from jobspy.util import (
extract_emails_from_text,
create_session,
markdown_converter,
remove_attributes,
create_logger,
)
from jobspy.model import (
JobPost,
Compensation,
Location,
JobResponse,
Country,
DescriptionFormat,
Scraper,
ScraperInput,
Site,
)
from jobspy.ziprecruiter.util import get_job_type_enum, add_params
log = create_logger("ZipRecruiter")
class ZipRecruiter(Scraper):
base_url = "https://www.ziprecruiter.com"
api_url = "https://api.ziprecruiter.com"
def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None
):
"""
Initializes ZipRecruiterScraper with the ZipRecruiter job search url
"""
super().__init__(Site.ZIP_RECRUITER, proxies=proxies)
self.scraper_input = None
self.session = create_session(proxies=proxies, ca_cert=ca_cert)
self.session.headers.update(headers)
self._get_cookies()
self.delay = 5
self.jobs_per_page = 20
self.seen_urls = set()
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes ZipRecruiter for jobs with scraper_input criteria.
:param scraper_input: Information about job search criteria.
:return: JobResponse containing a list of jobs.
"""
self.scraper_input = scraper_input
job_list: list[JobPost] = []
continue_token = None
max_pages = math.ceil(scraper_input.results_wanted / self.jobs_per_page)
for page in range(1, max_pages + 1):
if len(job_list) >= scraper_input.results_wanted:
break
if page > 1:
time.sleep(self.delay)
log.info(f"search page: {page} / {max_pages}")
jobs_on_page, continue_token = self._find_jobs_in_page(
scraper_input, continue_token
)
if jobs_on_page:
job_list.extend(jobs_on_page)
else:
break
if not continue_token:
break
return JobResponse(jobs=job_list[: scraper_input.results_wanted])
def _find_jobs_in_page(
self, scraper_input: ScraperInput, continue_token: str | None = None
) -> tuple[list[JobPost], str | None]:
"""
Scrapes a page of ZipRecruiter for jobs with scraper_input criteria
:param scraper_input:
:param continue_token:
:return: jobs found on page
"""
jobs_list = []
params = add_params(scraper_input)
if continue_token:
params["continue_from"] = continue_token
try:
res = self.session.get(f"{self.api_url}/jobs-app/jobs", params=params)
if res.status_code not in range(200, 400):
if res.status_code == 429:
err = "429 Response - Blocked by ZipRecruiter for too many requests"
else:
err = f"ZipRecruiter response status code {res.status_code}"
err += f" with response: {res.text}" # ZipRecruiter likely not available in EU
log.error(err)
return jobs_list, ""
except Exception as e:
if "Proxy responded with" in str(e):
log.error(f"Indeed: Bad proxy")
else:
log.error(f"Indeed: {str(e)}")
return jobs_list, ""
res_data = res.json()
jobs_list = res_data.get("jobs", [])
next_continue_token = res_data.get("continue", None)
with ThreadPoolExecutor(max_workers=self.jobs_per_page) as executor:
job_results = [executor.submit(self._process_job, job) for job in jobs_list]
job_list = list(filter(None, (result.result() for result in job_results)))
return job_list, next_continue_token
def _process_job(self, job: dict) -> JobPost | None:
"""
Processes an individual job dict from the response
"""
title = job.get("name")
job_url = f"{self.base_url}/jobs//j?lvk={job['listing_key']}"
if job_url in self.seen_urls:
return
self.seen_urls.add(job_url)
description = job.get("job_description", "").strip()
listing_type = job.get("buyer_type", "")
description = (
markdown_converter(description)
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN
else description
)
company = job.get("hiring_company", {}).get("name")
country_value = "usa" if job.get("job_country") == "US" else "canada"
country_enum = Country.from_string(country_value)
location = Location(
city=job.get("job_city"), state=job.get("job_state"), country=country_enum
)
job_type = get_job_type_enum(
job.get("employment_type", "").replace("_", "").lower()
)
date_posted = datetime.fromisoformat(job["posted_time"].rstrip("Z")).date()
comp_interval = job.get("compensation_interval")
comp_interval = "yearly" if comp_interval == "annual" else comp_interval
comp_min = int(job["compensation_min"]) if "compensation_min" in job else None
comp_max = int(job["compensation_max"]) if "compensation_max" in job else None
comp_currency = job.get("compensation_currency")
description_full, job_url_direct = self._get_descr(job_url)
return JobPost(
id=f'zr-{job["listing_key"]}',
title=title,
company_name=company,
location=location,
job_type=job_type,
compensation=Compensation(
interval=comp_interval,
min_amount=comp_min,
max_amount=comp_max,
currency=comp_currency,
),
date_posted=date_posted,
job_url=job_url,
description=description_full if description_full else description,
emails=extract_emails_from_text(description) if description else None,
job_url_direct=job_url_direct,
listing_type=listing_type,
)
def _get_descr(self, job_url):
res = self.session.get(job_url, allow_redirects=True)
description_full = job_url_direct = None
if res.ok:
soup = BeautifulSoup(res.text, "html.parser")
job_descr_div = soup.find("div", class_="job_description")
company_descr_section = soup.find("section", class_="company_description")
job_description_clean = (
remove_attributes(job_descr_div).prettify(formatter="html")
if job_descr_div
else ""
)
company_description_clean = (
remove_attributes(company_descr_section).prettify(formatter="html")
if company_descr_section
else ""
)
description_full = job_description_clean + company_description_clean
try:
script_tag = soup.find("script", type="application/json")
if script_tag:
job_json = json.loads(script_tag.string)
job_url_val = job_json["model"].get("saveJobURL", "")
m = re.search(r"job_url=(.+)", job_url_val)
if m:
job_url_direct = m.group(1)
except:
job_url_direct = None
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description_full = markdown_converter(description_full)
return description_full, job_url_direct
def _get_cookies(self):
"""
Sends a session event to the API with device properties.
"""
url = f"{self.api_url}/jobs-app/event"
self.session.post(url, data=get_cookie_data)

View File

@@ -0,0 +1,29 @@
headers = {
"Host": "api.ziprecruiter.com",
"accept": "*/*",
"x-zr-zva-override": "100000000;vid:ZT1huzm_EQlDTVEc",
"x-pushnotificationid": "0ff4983d38d7fc5b3370297f2bcffcf4b3321c418f5c22dd152a0264707602a0",
"x-deviceid": "D77B3A92-E589-46A4-8A39-6EF6F1D86006",
"user-agent": "Job Search/87.0 (iPhone; CPU iOS 16_6_1 like Mac OS X)",
"authorization": "Basic YTBlZjMyZDYtN2I0Yy00MWVkLWEyODMtYTI1NDAzMzI0YTcyOg==",
"accept-language": "en-US,en;q=0.9",
}
get_cookie_data = [
("event_type", "session"),
("logged_in", "false"),
("number_of_retry", "1"),
("property", "model:iPhone"),
("property", "os:iOS"),
("property", "locale:en_us"),
("property", "app_build_number:4734"),
("property", "app_version:91.0"),
("property", "manufacturer:Apple"),
("property", "timestamp:2025-01-12T12:04:42-06:00"),
("property", "screen_height:852"),
("property", "os_version:16.6.1"),
("property", "source:install"),
("property", "screen_width:393"),
("property", "device_model:iPhone 14 Pro"),
("property", "brand:Apple"),
]

View File

@@ -0,0 +1,31 @@
from jobspy.model import JobType
def add_params(scraper_input) -> dict[str, str | int]:
params: dict[str, str | int] = {
"search": scraper_input.search_term,
"location": scraper_input.location,
}
if scraper_input.hours_old:
params["days"] = max(scraper_input.hours_old // 24, 1)
job_type_map = {JobType.FULL_TIME: "full_time", JobType.PART_TIME: "part_time"}
if scraper_input.job_type:
job_type = scraper_input.job_type
params["employment_type"] = job_type_map.get(job_type, job_type.value[0])
if scraper_input.easy_apply:
params["zipapply"] = 1
if scraper_input.is_remote:
params["remote"] = 1
if scraper_input.distance:
params["radius"] = scraper_input.distance
return {k: v for k, v in params.items() if v is not None}
def get_job_type_enum(job_type_str: str) -> list[JobType] | None:
for job_type in JobType:
if job_type_str in job_type.value:
return [job_type]
return None