Merge pull request #5 from JobSpy-ai/features/scraper-refactor

Features/scraper refactor
pull/12/head
Zachary Hampton 2023-07-10 17:52:26 -05:00 committed by GitHub
commit 47a8c62d44
5 changed files with 59 additions and 24 deletions

View File

@ -58,8 +58,12 @@ class JobPost(BaseModel):
class JobResponse(BaseModel): class JobResponse(BaseModel):
job_count: int success: bool
page: int = 1 error: str = None
total_pages: int
total_pages: int = None
job_count: int = None
page: int = None
jobs: list[JobPost] = []
jobs: list[JobPost]

View File

@ -1,6 +1,6 @@
from pydantic import BaseModel from pydantic import BaseModel
from enum import Enum from enum import Enum
from ..jobs import JobResponse from ..jobs import JobResponse, JobPost
class Site(Enum): class Site(Enum):
@ -13,13 +13,11 @@ class ScraperInput(BaseModel):
location: str location: str
search_term: str search_term: str
distance: int = 25 distance: int = 25
results_wanted: int = 15 #: TODO: implement
page: int = 1
class Scraper: #: to be used as a child class class Scraper: #: to be used as a child class
def __init__(self, site: Site): def __init__(self, site: Site):
self.site = site self.site = site
def scrape(self, scraper_input: ScraperInput) -> JobResponse: def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...
...

View File

@ -10,6 +10,10 @@ from api.core.jobs import *
from api.core.scrapers import Scraper, ScraperInput, Site from api.core.scrapers import Scraper, ScraperInput, Site
class ParsingException(Exception):
pass
class IndeedScraper(Scraper): class IndeedScraper(Scraper):
def __init__(self): def __init__(self):
site = Site(Site.INDEED) site = Site(Site.INDEED)
@ -25,7 +29,7 @@ class IndeedScraper(Scraper):
"q": scraper_input.search_term, "q": scraper_input.search_term,
"l": scraper_input.location, "l": scraper_input.location,
"filter": 0, "filter": 0,
"start": 0 if scraper_input.page is None else (scraper_input.page - 1) * 10, "start": 0,
"radius": scraper_input.distance, "radius": scraper_input.distance,
} }
@ -38,12 +42,25 @@ class IndeedScraper(Scraper):
soup = BeautifulSoup(response.content, "html.parser") soup = BeautifulSoup(response.content, "html.parser")
try:
jobs = IndeedScraper.parse_jobs(soup) jobs = IndeedScraper.parse_jobs(soup)
except ParsingException:
return JobResponse(
success=False,
error="Failed to parse jobs.",
)
total_num_jobs = IndeedScraper.total_jobs(soup) total_num_jobs = IndeedScraper.total_jobs(soup)
total_pages = ceil(total_num_jobs / 15) total_pages = ceil(total_num_jobs / 15)
job_list: list[JobPost] = [] job_list: list[JobPost] = []
# page_number = jobs["metaData"]["mosaicProviderJobCardsModel"]["pageNumber"] if not jobs.get('metaData', {}).get("mosaicProviderJobCardsModel", {}).get("results"):
return JobResponse(
success=False,
error="No jobs found",
)
page_number = jobs["metaData"]["mosaicProviderJobCardsModel"]["pageNumber"]
for job in jobs["metaData"]["mosaicProviderJobCardsModel"]["results"]: for job in jobs["metaData"]["mosaicProviderJobCardsModel"]["results"]:
snippet_html = BeautifulSoup(job["snippet"], "html.parser") snippet_html = BeautifulSoup(job["snippet"], "html.parser")
@ -94,9 +111,10 @@ class IndeedScraper(Scraper):
job_list.append(job_post) job_list.append(job_post)
job_response = JobResponse( job_response = JobResponse(
success=True,
jobs=job_list, jobs=job_list,
job_count=total_num_jobs, job_count=total_num_jobs,
page=scraper_input.page, page=page_number,
total_pages=total_pages, total_pages=total_pages,
) )
return job_response return job_response
@ -116,7 +134,14 @@ class IndeedScraper(Scraper):
return None return None
@staticmethod @staticmethod
def parse_jobs(soup): def parse_jobs(soup: BeautifulSoup) -> dict:
"""
Parses the jobs from the soup object
:param soup:
:return: jobs
"""
script_tag = IndeedScraper.find_mosaic_script(soup) script_tag = IndeedScraper.find_mosaic_script(soup)
if script_tag: if script_tag:
@ -130,11 +155,9 @@ class IndeedScraper(Scraper):
jobs = json.loads(m.group(1).strip()) jobs = json.loads(m.group(1).strip())
return jobs return jobs
else: else:
return {"message": f"Could not find mosaic provider job cards data"} raise ParsingException("Could not find mosaic provider job cards data")
else: else:
return { raise ParsingException("Could not find a script tag containing mosaic provider data")
"message": f"Could not find a script tag containing mosaic provider data"
}
@staticmethod @staticmethod
def total_jobs(soup): def total_jobs(soup):

View File

@ -16,8 +16,10 @@ class LinkedInScraper(Scraper):
self.url = "https://www.linkedin.com/jobs" self.url = "https://www.linkedin.com/jobs"
def scrape(self, scraper_input: ScraperInput) -> JobResponse: def scrape(self, scraper_input: ScraperInput) -> JobResponse:
current_page = 0
params = { params = {
"pageNum": scraper_input.page - 1, "pageNum": current_page,
"location": scraper_input.location, "location": scraper_input.location,
"distance": scraper_input.distance, "distance": scraper_input.distance,
} }
@ -58,6 +60,8 @@ class LinkedInScraper(Scraper):
if datetime_tag: if datetime_tag:
datetime_str = datetime_tag["datetime"] datetime_str = datetime_tag["datetime"]
date_posted = datetime.strptime(datetime_str, "%Y-%m-%d") date_posted = datetime.strptime(datetime_str, "%Y-%m-%d")
else:
date_posted = None
job_post = JobPost( job_post = JobPost(
title=title, title=title,
@ -74,9 +78,11 @@ class LinkedInScraper(Scraper):
job_count = int("".join(filter(str.isdigit, job_count_text))) job_count = int("".join(filter(str.isdigit, job_count_text)))
total_pages = ceil(job_count / 25) total_pages = ceil(job_count / 25)
job_response = JobResponse( job_response = JobResponse(
success=True,
jobs=job_list, jobs=job_list,
job_count=job_count, job_count=job_count,
page=scraper_input.page, page=current_page + 1,
total_pages=total_pages, total_pages=total_pages,
) )
return job_response return job_response

View File

@ -21,10 +21,12 @@ class ZipRecruiterScraper(Scraper):
client_identifier="chrome112", random_tls_extension_order=True client_identifier="chrome112", random_tls_extension_order=True
) )
current_page = 1
params = { params = {
"search": scraper_input.search_term, "search": scraper_input.search_term,
"location": scraper_input.location, "location": scraper_input.location,
"page": min(scraper_input.page, 10), "page": min(current_page, 10),
"radius": scraper_input.distance, "radius": scraper_input.distance,
} }
@ -80,6 +82,7 @@ class ZipRecruiterScraper(Scraper):
job_count = job_count.replace(",", "") job_count = job_count.replace(",", "")
total_pages = data["maxPages"] total_pages = data["maxPages"]
job_response = JobResponse( job_response = JobResponse(
success=True,
jobs=job_list, jobs=job_list,
job_count=job_count, job_count=job_count,
page=params["page"], page=params["page"],
@ -87,6 +90,7 @@ class ZipRecruiterScraper(Scraper):
) )
return job_response return job_response
@staticmethod
def get_interval(interval_str): def get_interval(interval_str):
interval_alias = {"annually": CompensationInterval.YEARLY} interval_alias = {"annually": CompensationInterval.YEARLY}
interval_str = interval_str.lower() interval_str = interval_str.lower()
@ -97,7 +101,7 @@ class ZipRecruiterScraper(Scraper):
return CompensationInterval(interval_str) return CompensationInterval(interval_str)
@staticmethod @staticmethod
def get_date_posted(job: str): def get_date_posted(job: BeautifulSoup):
button = job.find( button = job.find(
"button", {"class": "action_input save_job zrs_btn_secondary_200"} "button", {"class": "action_input save_job zrs_btn_secondary_200"}
) )
@ -107,7 +111,7 @@ class ZipRecruiterScraper(Scraper):
return params.get("posted_time", [None])[0] return params.get("posted_time", [None])[0]
@staticmethod @staticmethod
def get_compensation(job): def get_compensation(job: BeautifulSoup):
pay_element = job.find("li", {"class": "perk_item perk_pay"}) pay_element = job.find("li", {"class": "perk_item perk_pay"})
if pay_element is None: if pay_element is None:
return None return None
@ -116,7 +120,7 @@ class ZipRecruiterScraper(Scraper):
return ZipRecruiterScraper.create_compensation_object(pay) return ZipRecruiterScraper.create_compensation_object(pay)
@staticmethod @staticmethod
def get_location(job): def get_location(job: BeautifulSoup):
location_string = job.find("a", {"class": "company_location"}).text.strip() location_string = job.find("a", {"class": "company_location"}).text.strip()
parts = location_string.split(", ") parts = location_string.split(", ")
city, state = parts city, state = parts