JobSpy/jobspy/core/scrapers/indeed/__init__.py

302 lines
10 KiB
Python

import re
import sys
import math
import json
from datetime import datetime
from typing import Optional, Tuple, List
import tls_client
import urllib.parse
from bs4 import BeautifulSoup
from bs4.element import Tag
from concurrent.futures import ThreadPoolExecutor, Future
from ...jobs import JobPost, Compensation, CompensationInterval, Location, JobResponse, JobType
from .. import Scraper, ScraperInput, Site, StatusException
class ParsingException(Exception):
pass
class IndeedScraper(Scraper):
def __init__(self):
"""
Initializes IndeedScraper with the Indeed job search url
"""
site = Site(Site.INDEED)
url = "https://www.indeed.com"
super().__init__(site, url)
self.jobs_per_page = 15
self.seen_urls = set()
def scrape_page(
self, scraper_input: ScraperInput, page: int, session: tls_client.Session
) -> tuple[list[JobPost], int]:
"""
Scrapes a page of Indeed for jobs with scraper_input criteria
:param scraper_input:
:param page:
:param session:
:return: jobs found on page, total number of jobs found for search
"""
job_list = []
params = {
"q": scraper_input.search_term,
"l": scraper_input.location,
"radius": scraper_input.distance,
"filter": 0,
"start": 0 + page * 10,
}
sc_values = []
if scraper_input.is_remote:
sc_values.append("attr(DSQF7)")
if scraper_input.job_type:
sc_values.append("jt({})".format(scraper_input.job_type.value))
if sc_values:
params["sc"] = "0kf:" + "".join(sc_values) + ";"
response = session.get(self.url + "/jobs", params=params)
if (
response.status_code != 200
and response.status_code != 307
):
raise StatusException(response.status_code)
soup = BeautifulSoup(response.content, "html.parser")
if "did not match any jobs" in str(soup):
raise ParsingException("Search did not match any jobs")
jobs = IndeedScraper.parse_jobs(
soup
) #: can raise exception, handled by main scrape function
total_num_jobs = IndeedScraper.total_jobs(soup)
if (
not jobs.get("metaData", {})
.get("mosaicProviderJobCardsModel", {})
.get("results")
):
raise Exception("No jobs found.")
def process_job(job) -> Optional[JobPost]:
job_url = f'{self.url}/jobs/viewjob?jk={job["jobkey"]}'
job_url_client = f'{self.url}/viewjob?jk={job["jobkey"]}'
if job_url in self.seen_urls:
return None
snippet_html = BeautifulSoup(job["snippet"], "html.parser")
extracted_salary = job.get("extractedSalary")
compensation = None
if extracted_salary:
salary_snippet = job.get("salarySnippet")
currency = salary_snippet.get("currency") if salary_snippet else None
interval = (extracted_salary.get("type"),)
if isinstance(interval, tuple):
interval = interval[0]
interval = interval.upper()
if interval in CompensationInterval.__members__:
compensation = Compensation(
interval=CompensationInterval[interval],
min_amount=int(extracted_salary.get("max")),
max_amount=int(extracted_salary.get("min")),
currency=currency,
)
job_type = IndeedScraper.get_job_type(job)
timestamp_seconds = job["pubDate"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds)
date_posted = date_posted.strftime("%Y-%m-%d")
description = self.get_description(job_url, session)
li_elements = snippet_html.find_all("li")
if description is None and li_elements:
description = " ".join(li.text for li in li_elements)
first_li = snippet_html.find("li")
job_post = JobPost(
title=job["normTitle"],
description=description,
company_name=job["company"],
location=Location(
city=job.get("jobLocationCity"),
state=job.get("jobLocationState"),
),
job_type=job_type,
compensation=compensation,
date_posted=date_posted,
job_url=job_url_client,
)
return job_post
with ThreadPoolExecutor(max_workers=10) as executor:
job_results: list[Future] = [executor.submit(process_job, job) for job in
jobs["metaData"]["mosaicProviderJobCardsModel"]["results"]]
job_list = [result.result() for result in job_results if result.result()]
return job_list, total_num_jobs
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Indeed for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
session = tls_client.Session(
client_identifier="chrome112", random_tls_extension_order=True
)
pages_to_process = (
math.ceil(scraper_input.results_wanted / self.jobs_per_page) - 1
)
try:
#: get first page to initialize session
job_list, total_results = self.scrape_page(scraper_input, 0, session)
with ThreadPoolExecutor(max_workers=10) as executor:
futures: list[Future] = [
executor.submit(self.scrape_page, scraper_input, page, session)
for page in range(1, pages_to_process + 1)
]
for future in futures:
jobs, _ = future.result()
job_list += jobs
except StatusException as e:
return JobResponse(
success=False,
error=f"Indeed returned status code {e.status_code}",
)
except ParsingException as e:
return JobResponse(
success=False,
error=f"Indeed failed to parse response: {e}",
)
except Exception as e:
return JobResponse(
success=False,
error=f"Indeed failed to scrape: {e}",
)
if len(job_list) > scraper_input.results_wanted:
job_list = job_list[: scraper_input.results_wanted]
job_response = JobResponse(
success=True,
jobs=job_list,
total_results=total_results,
)
return job_response
def get_description(self, job_page_url: str, session: tls_client.Session) -> str:
"""
Retrieves job description by going to the job page url
:param job_page_url:
:param session:
:return: description
"""
parsed_url = urllib.parse.urlparse(job_page_url)
params = urllib.parse.parse_qs(parsed_url.query)
jk_value = params.get("jk", [None])[0]
formatted_url = f"{self.url}/viewjob?jk={jk_value}&spa=1"
response = session.get(formatted_url, allow_redirects=True)
if response.status_code not in range(200, 400):
return None
raw_description = response.json()["body"]["jobInfoWrapperModel"][
"jobInfoModel"
]["sanitizedJobDescription"]
soup = BeautifulSoup(raw_description, "html.parser")
text_content = " ".join(soup.get_text().split()).strip()
return text_content
@staticmethod
def get_job_type(job: dict) -> Optional[JobType]:
"""
Parses the job to get JobTypeIndeed
:param job:
:return:
"""
for taxonomy in job["taxonomyAttributes"]:
if taxonomy["label"] == "job-types":
if len(taxonomy["attributes"]) > 0:
job_type_str = (
taxonomy["attributes"][0]["label"]
.replace("-", "_")
.replace(" ", "_")
.upper()
)
return JobType[job_type_str]
return None
@staticmethod
def parse_jobs(soup: BeautifulSoup) -> dict:
"""
Parses the jobs from the soup object
:param soup:
:return: jobs
"""
def find_mosaic_script() -> Optional[Tag]:
"""
Finds jobcards script tag
:return: script_tag
"""
script_tags = soup.find_all("script")
for tag in script_tags:
if (
tag.string
and "mosaic.providerData" in tag.string
and "mosaic-provider-jobcards" in tag.string
):
return tag
return None
script_tag = find_mosaic_script()
if script_tag:
script_str = script_tag.string
pattern = r'window.mosaic.providerData\["mosaic-provider-jobcards"\]\s*=\s*({.*?});'
p = re.compile(pattern, re.DOTALL)
m = p.search(script_str)
if m:
jobs = json.loads(m.group(1).strip())
return jobs
else:
raise ParsingException("Could not find mosaic provider job cards data")
else:
raise ParsingException(
"Could not find a script tag containing mosaic provider data"
)
@staticmethod
def total_jobs(soup: BeautifulSoup) -> int:
"""
Parses the total jobs for that search from soup object
:param soup:
:return: total_num_jobs
"""
script = soup.find("script", string=lambda t: "window._initialData" in t)
pattern = re.compile(r"window._initialData\s*=\s*({.*})\s*;", re.DOTALL)
match = pattern.search(script.string)
total_num_jobs = 0
if match:
json_str = match.group(1)
data = json.loads(json_str)
total_num_jobs = int(data["searchTitleBarModel"]["totalNumResults"])
return total_num_jobs