Merge pull request #8 from JobSpy-ai/features/parallel-page-processing

Features/parallel page processing
pull/12/head
Cullen 2023-07-11 12:25:14 -05:00 committed by GitHub
commit b36cfaeca2
5 changed files with 279 additions and 210 deletions

View File

@ -2,7 +2,7 @@ from typing import Union
from datetime import datetime from datetime import datetime
from enum import Enum from enum import Enum
from pydantic import BaseModel from pydantic import BaseModel, validator
class JobType(Enum): class JobType(Enum):
@ -57,5 +57,13 @@ class JobResponse(BaseModel):
success: bool success: bool
error: str = None error: str = None
job_count: int = None total_results: int = None
returned_results: int = None
jobs: list[JobPost] = [] jobs: list[JobPost] = []
@validator("returned_results")
def set_returned_results(cls, v, values):
if v is None and values.get("jobs"):
return len(values["jobs"])
return v

View File

@ -1,6 +1,11 @@
from ..jobs import * from ..jobs import *
class StatusException(Exception):
def __init__(self, status_code: int):
self.status_code = status_code
class Site(Enum): class Site(Enum):
LINKEDIN = "linkedin" LINKEDIN = "linkedin"
INDEED = "indeed" INDEED = "indeed"

View File

@ -1,6 +1,6 @@
import re import re
import json import json
from typing import Optional from typing import Optional, Tuple, List
import tls_client import tls_client
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
@ -8,7 +8,11 @@ from bs4.element import Tag
from fastapi import status from fastapi import status
from api.core.jobs import * from api.core.jobs import *
from api.core.scrapers import Scraper, ScraperInput, Site from api.core.jobs import JobPost
from api.core.scrapers import Scraper, ScraperInput, Site, StatusException
from concurrent.futures import ThreadPoolExecutor, Future
import math
class ParsingException(Exception): class ParsingException(Exception):
@ -25,6 +29,108 @@ class IndeedScraper(Scraper):
self.url = "https://www.indeed.com/jobs" self.url = "https://www.indeed.com/jobs"
self.job_url = "https://www.indeed.com/viewjob?jk=" self.job_url = "https://www.indeed.com/viewjob?jk="
self.jobs_per_page = 15
self.seen_urls = set()
def scrape_page(
self, scraper_input: ScraperInput, page: int, session: tls_client.Session
) -> tuple[list[JobPost], int]:
"""
Scrapes a page of Indeed for jobs with scraper_input criteria
:param scraper_input:
:param page:
:param session:
:return: jobs found on page, total number of jobs found for search
"""
job_list = []
params = {
"q": scraper_input.search_term,
"location": scraper_input.location,
"radius": scraper_input.distance,
"filter": 0,
"start": 0 + page * 10,
}
sc_values = []
if scraper_input.is_remote:
sc_values.append("attr(DSQF7)")
if scraper_input.job_type:
sc_values.append("jt({})".format(scraper_input.job_type.value))
if sc_values:
params["sc"] = "0kf:" + "".join(sc_values) + ";"
response = session.get(self.url, params=params)
if (
response.status_code != status.HTTP_200_OK
and response.status_code != status.HTTP_307_TEMPORARY_REDIRECT
):
raise StatusException(response.status_code)
soup = BeautifulSoup(response.content, "html.parser")
jobs = IndeedScraper.parse_jobs(
soup
) #: can raise exception, handled by main scrape function
total_num_jobs = IndeedScraper.total_jobs(soup)
if (
not jobs.get("metaData", {})
.get("mosaicProviderJobCardsModel", {})
.get("results")
):
raise Exception("No jobs found.")
for job in jobs["metaData"]["mosaicProviderJobCardsModel"]["results"]:
job_url = f'{self.job_url}{job["jobkey"]}'
if job_url in self.seen_urls:
continue
snippet_html = BeautifulSoup(job["snippet"], "html.parser")
extracted_salary = job.get("extractedSalary")
compensation = None
if extracted_salary:
salary_snippet = job.get("salarySnippet")
currency = salary_snippet.get("currency") if salary_snippet else None
interval = (extracted_salary.get("type"),)
if isinstance(interval, tuple):
interval = interval[0]
interval = interval.upper()
if interval in CompensationInterval.__members__:
compensation = Compensation(
interval=CompensationInterval[interval],
min_amount=extracted_salary.get("max"),
max_amount=extracted_salary.get("min"),
currency=currency,
)
job_type = IndeedScraper.get_job_type(job)
timestamp_seconds = job["pubDate"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds)
first_li = snippet_html.find("li")
job_post = JobPost(
title=job["normTitle"],
description=first_li.text if first_li else None,
company_name=job["company"],
location=Location(
city=job.get("jobLocationCity"),
state=job.get("jobLocationState"),
postal_code=job.get("jobLocationPostal"),
country="US",
),
job_type=job_type,
compensation=compensation,
date_posted=date_posted,
job_url=job_url,
)
job_list.append(job_post)
return job_list, total_num_jobs
def scrape(self, scraper_input: ScraperInput) -> JobResponse: def scrape(self, scraper_input: ScraperInput) -> JobResponse:
""" """
Scrapes Indeed for jobs with scraper_input criteria Scrapes Indeed for jobs with scraper_input criteria
@ -35,125 +141,48 @@ class IndeedScraper(Scraper):
client_identifier="chrome112", random_tls_extension_order=True client_identifier="chrome112", random_tls_extension_order=True
) )
job_list: list[JobPost] = [] pages_to_process = (
page = 0 math.ceil(scraper_input.results_wanted / self.jobs_per_page) - 1
processed_jobs, total_num_jobs = 0, 0 )
seen_urls = set()
while len(job_list) < scraper_input.results_wanted:
params = {
"q": scraper_input.search_term,
"location": scraper_input.location,
"radius": scraper_input.distance,
"filter": 0,
"start": 0 + page * 10,
}
sc_values = []
if scraper_input.is_remote:
sc_values.append("attr(DSQF7)")
if scraper_input.job_type:
sc_values.append("jt({})".format(scraper_input.job_type.value))
if sc_values: try:
params["sc"] = "0kf:" + "".join(sc_values) + ";" #: get first page to initialize session
response = session.get(self.url, params=params) job_list, total_results = self.scrape_page(scraper_input, 0, session)
if ( with ThreadPoolExecutor(max_workers=10) as executor:
response.status_code != status.HTTP_200_OK futures: list[Future] = [
and response.status_code != status.HTTP_307_TEMPORARY_REDIRECT executor.submit(self.scrape_page, scraper_input, page, session)
): for page in range(1, pages_to_process + 1)
return JobResponse( ]
success=False,
error=f"Response returned {response.status_code}",
)
soup = BeautifulSoup(response.content, "html.parser") for future in futures:
jobs, _ = future.result()
try: job_list += jobs
jobs = IndeedScraper.parse_jobs(soup)
except ParsingException:
return JobResponse(
success=False,
error="Failed to parse jobs.",
)
total_num_jobs = IndeedScraper.total_jobs(soup) except StatusException as e:
return JobResponse(
success=False,
error=f"Indeed returned status code {e.status_code}",
)
except ParsingException as e:
return JobResponse(
success=False,
error=f"Indeed failed to parse response: {e}",
)
except Exception as e:
return JobResponse(
success=False,
error=f"Indeed failed to scrape: {e}",
)
if ( if len(job_list) > scraper_input.results_wanted:
not jobs.get("metaData", {}) job_list = job_list[: scraper_input.results_wanted]
.get("mosaicProviderJobCardsModel", {})
.get("results")
):
return JobResponse(
success=False,
error="No jobs found",
)
for job in jobs["metaData"]["mosaicProviderJobCardsModel"]["results"]:
processed_jobs += 1
job_url = f'{self.job_url}{job["jobkey"]}'
if job_url in seen_urls:
continue
snippet_html = BeautifulSoup(job["snippet"], "html.parser")
extracted_salary = job.get("extractedSalary")
compensation = None
if extracted_salary:
salary_snippet = job.get("salarySnippet")
currency = (
salary_snippet.get("currency") if salary_snippet else None
)
interval = (extracted_salary.get("type"),)
if isinstance(interval, tuple):
interval = interval[0]
interval = interval.upper()
if interval in CompensationInterval.__members__:
compensation = Compensation(
interval=CompensationInterval[interval],
min_amount=extracted_salary.get("max"),
max_amount=extracted_salary.get("min"),
currency=currency,
)
job_type = IndeedScraper.get_job_type(job)
timestamp_seconds = job["pubDate"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds)
first_li = snippet_html.find("li")
job_post = JobPost(
title=job["normTitle"],
description=first_li.text if first_li else None,
company_name=job["company"],
location=Location(
city=job.get("jobLocationCity"),
state=job.get("jobLocationState"),
postal_code=job.get("jobLocationPostal"),
country="US",
),
job_type=job_type,
compensation=compensation,
date_posted=date_posted,
job_url=job_url,
)
job_list.append(job_post)
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= total_num_jobs
):
break
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= total_num_jobs
):
break
page += 1
job_list = job_list[: scraper_input.results_wanted]
job_response = JobResponse( job_response = JobResponse(
success=True, success=True,
jobs=job_list, jobs=job_list,
job_count=total_num_jobs, total_results=total_results,
) )
return job_response return job_response

View File

@ -131,7 +131,7 @@ class LinkedInScraper(Scraper):
job_response = JobResponse( job_response = JobResponse(
success=True, success=True,
jobs=job_list, jobs=job_list,
job_count=job_count, total_results=job_count,
) )
return job_response return job_response

View File

@ -1,13 +1,16 @@
import json import json
from typing import Optional from typing import Optional, Tuple, List
from urllib.parse import urlparse, parse_qs from urllib.parse import urlparse, parse_qs
import tls_client import tls_client
from fastapi import status from fastapi import status
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from concurrent.futures import ThreadPoolExecutor, Future
from api.core.scrapers import Scraper, ScraperInput, Site from api.core.jobs import JobPost
from api.core.scrapers import Scraper, ScraperInput, Site, StatusException
from api.core.jobs import * from api.core.jobs import *
import math
class ZipRecruiterScraper(Scraper): class ZipRecruiterScraper(Scraper):
@ -19,6 +22,101 @@ class ZipRecruiterScraper(Scraper):
super().__init__(site) super().__init__(site)
self.url = "https://www.ziprecruiter.com/jobs-search" self.url = "https://www.ziprecruiter.com/jobs-search"
self.jobs_per_page = 20
self.seen_urls = set()
def scrape_page(
self, scraper_input: ScraperInput, page: int, session: tls_client.Session
) -> tuple[list[JobPost], int | None]:
"""
Scrapes a page of ZipRecruiter for jobs with scraper_input criteria
:param scraper_input:
:param page:
:param session:
:return: jobs found on page, total number of jobs found for search
"""
job_list = []
job_type_value = None
if scraper_input.job_type:
if scraper_input.job_type.value == "fulltime":
job_type_value = "full_time"
elif scraper_input.job_type.value == "parttime":
job_type_value = "part_time"
else:
job_type_value = scraper_input.job_type.value
params = {
"search": scraper_input.search_term,
"location": scraper_input.location,
"radius": scraper_input.distance,
"refine_by_location_type": "only_remote"
if scraper_input.is_remote
else None,
"refine_by_employment": f"employment_type:employment_type:{job_type_value}"
if job_type_value
else None,
"page": page,
}
response = session.get(
self.url, headers=ZipRecruiterScraper.headers(), params=params
)
if response.status_code != status.HTTP_200_OK:
raise StatusException(response.status_code)
html_string = response.content
soup = BeautifulSoup(html_string, "html.parser")
if page == 1:
script_tag = soup.find("script", {"id": "js_variables"})
data = json.loads(script_tag.string)
job_count = int(data["totalJobCount"].replace(",", ""))
else:
job_count = None
job_posts = soup.find_all("div", {"class": "job_content"})
for job in job_posts:
job_url = job.find("a", {"class": "job_link"})["href"]
if job_url in self.seen_urls:
continue
title = job.find("h2", {"class": "title"}).text
company = job.find("a", {"class": "company_name"}).text.strip()
description = job.find("p", {"class": "job_snippet"}).text.strip()
job_type_element = job.find("li", {"class": "perk_item perk_type"})
if job_type_element:
job_type_text = (
job_type_element.text.strip()
.lower()
.replace("-", "")
.replace(" ", "")
)
if job_type_text == "contractor":
job_type_text = "contract"
job_type = JobType(job_type_text)
else:
job_type = None
date_posted = ZipRecruiterScraper.get_date_posted(job)
job_post = JobPost(
title=title,
description=description,
company_name=company,
location=ZipRecruiterScraper.get_location(job),
job_type=job_type,
compensation=ZipRecruiterScraper.get_compensation(job),
date_posted=date_posted,
job_url=job_url,
)
job_list.append(job_post)
return job_list, job_count
def scrape(self, scraper_input: ScraperInput) -> JobResponse: def scrape(self, scraper_input: ScraperInput) -> JobResponse:
""" """
@ -30,109 +128,38 @@ class ZipRecruiterScraper(Scraper):
client_identifier="chrome112", random_tls_extension_order=True client_identifier="chrome112", random_tls_extension_order=True
) )
job_list: list[JobPost] = [] pages_to_process = math.ceil(scraper_input.results_wanted / self.jobs_per_page)
page = 1
processed_jobs, job_count = 0, 0
seen_urls = set()
while len(job_list) < scraper_input.results_wanted:
job_type_value = None
if scraper_input.job_type:
if scraper_input.job_type.value == "fulltime":
job_type_value = "full_time"
elif scraper_input.job_type.value == "parttime":
job_type_value = "part_time"
else:
job_type_value = scraper_input.job_type.value
params = { try:
"search": scraper_input.search_term, #: get first page to initialize session
"location": scraper_input.location, job_list, total_results = self.scrape_page(scraper_input, 1, session)
"radius": scraper_input.distance,
"refine_by_location_type": "only_remote"
if scraper_input.is_remote
else None,
"refine_by_employment": f"employment_type:employment_type:{job_type_value}"
if job_type_value
else None,
"page": page,
}
response = session.get( with ThreadPoolExecutor(max_workers=10) as executor:
self.url, headers=ZipRecruiterScraper.headers(), params=params futures: list[Future] = [
executor.submit(self.scrape_page, scraper_input, page, session)
for page in range(2, pages_to_process + 1)
]
for future in futures:
jobs, _ = future.result()
job_list += jobs
except StatusException as e:
return JobResponse(
success=False,
error=f"ZipRecruiter returned status code {e.status_code}",
) )
print(response.url)
if response.status_code != status.HTTP_200_OK:
return JobResponse(
success=False,
error=f"Response returned {response.status_code}",
)
html_string = response.content #: note: this does not handle if the results are more or less than the results_wanted
soup = BeautifulSoup(html_string, "html.parser")
if page == 1:
script_tag = soup.find("script", {"id": "js_variables"})
data = json.loads(script_tag.string)
job_count = data["totalJobCount"] if len(job_list) > scraper_input.results_wanted:
job_count = int(job_count.replace(",", "")) job_list = job_list[: scraper_input.results_wanted]
job_posts = soup.find_all("div", {"class": "job_content"})
for job in job_posts:
processed_jobs += 1
job_url = job.find("a", {"class": "job_link"})["href"]
if job_url in seen_urls:
continue
title = job.find("h2", {"class": "title"}).text
company = job.find("a", {"class": "company_name"}).text.strip()
description = job.find("p", {"class": "job_snippet"}).text.strip()
job_type_element = job.find("li", {"class": "perk_item perk_type"})
if job_type_element:
job_type_text = (
job_type_element.text.strip()
.lower()
.replace("-", "")
.replace(" ", "")
)
if job_type_text == "contractor":
job_type_text = "contract"
job_type = JobType(job_type_text)
else:
job_type = None
date_posted = ZipRecruiterScraper.get_date_posted(job)
job_post = JobPost(
title=title,
description=description,
company_name=company,
location=ZipRecruiterScraper.get_location(job),
job_type=job_type,
compensation=ZipRecruiterScraper.get_compensation(job),
date_posted=date_posted,
job_url=job_url,
)
job_list.append(job_post)
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= job_count
):
break
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= job_count
):
break
page += 1
job_list = job_list[: scraper_input.results_wanted]
job_response = JobResponse( job_response = JobResponse(
success=True, success=True,
jobs=job_list, jobs=job_list,
job_count=job_count, total_results=total_results,
) )
return job_response return job_response