feat(jobs): remove pages for results_wanted

pull/12/head
Cullen Watson 2023-07-10 22:07:19 -05:00
parent bdb851f1cc
commit 9a01426831
11 changed files with 449 additions and 271 deletions

View File

@ -12,7 +12,12 @@ load_dotenv()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/token")
def create_access_token(data: dict):
def create_access_token(data: dict) -> str:
"""
Creates a JWT token based on the data provided.
:param data
:return: encoded_jwt
"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
@ -21,6 +26,12 @@ def create_access_token(data: dict):
async def get_current_user(token: str = Depends(oauth2_scheme)):
"""
Returns the current user associated with the provided JWT token.
:param token
:raises HTTPException: If the token is invalid or the user does not exist.
:return: The UserInDB instance associated with the token.
"""
credential_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
@ -42,6 +53,13 @@ async def get_current_user(token: str = Depends(oauth2_scheme)):
async def get_active_current_user(current_user: UserInDB = Depends(get_current_user)):
"""
Returns the current user if the user account is active.
:param current_user: A UserInDB instance representing the current user.
:raises HTTPException: If the user account is inactive.
:return: The UserInDB instance if the user account is active.
"""
if current_user.disabled:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Inactive user."

View File

@ -1,3 +1,5 @@
from typing import Optional, Union
from passlib.context import CryptContext
from supabase_py import create_client, Client
from fastapi import HTTPException, status
@ -10,6 +12,13 @@ supabase: Client = create_client(SUPABASE_URL, SUPABASE_KEY)
def create_user(user_create: UserInDB):
"""
Creates a new user record in the 'users' table in Supabase.
:param user_create: The data of the user to be created.
:raises HTTPException: If an error occurs while creating the user.
:return: The result of the insert operation.
"""
result = supabase.table("users").insert(user_create.dict()).execute()
print(f"Insert result: {result}")
@ -22,7 +31,13 @@ def create_user(user_create: UserInDB):
return result
def get_user(username: str):
def get_user(username: str) -> Optional[UserInDB]:
"""
Retrieves a user from the 'users' table by their username.
:param username: The username of the user to retrieve.
:return: The user data if found, otherwise None.
"""
result = supabase.table("users").select().eq("username", username).execute()
if "error" in result and result["error"]:
@ -36,15 +51,35 @@ def get_user(username: str):
return None
def verify_password(password: str, hashed_password: str):
def verify_password(password: str, hashed_password: str) -> bool:
"""
Verifies a password against a hashed password using the bcrypt hashing algorithm.
:param password: The plaintext password to verify.
:param hashed_password: The hashed password to compare against.
:return: True if the password matches the hashed password, otherwise False.
"""
return pwd_context.verify(password, hashed_password)
def get_password_hash(password):
def get_password_hash(password: str) -> str:
"""
Hashes a password using the bcrypt hashing algorithm.
:param password: The plaintext password to hash.
:return: The hashed password
"""
return pwd_context.hash(password)
def authenticate_user(username: str, password: str):
def authenticate_user(username: str, password: str) -> Union[UserInDB, bool]:
"""
Authenticates a user based on their username and password.
:param username: The username of the user to authenticate.
:param password: The plaintext password to authenticate.
:return: The authenticated user if the username and password are correct, otherwise False.
"""
user = get_user(username)
if not user:
return False

View File

@ -5,8 +5,14 @@ from api.auth.db_utils import get_user, get_password_hash, create_user
router = APIRouter(prefix="/register", tags=["register"])
@router.post("/")
async def register_new_user(user: UserCreate):
@router.post("/", response_model=dict)
async def register_new_user(user: UserCreate) -> dict:
"""
Creates new user
:param user:
:raises HTTPException: If the username already exists.
:return: A dictionary containing a detail key with a success message.
"""
existing_user = get_user(user.username)
if existing_user is not None:
raise HTTPException(
@ -15,7 +21,6 @@ async def register_new_user(user: UserCreate):
)
hashed_password = get_password_hash(user.password)
print(f"Hashed password: {hashed_password}")
user_create = UserInDB(
username=user.username,
email=user.email,

View File

@ -9,7 +9,15 @@ router = APIRouter(prefix="/token", tags=["token"])
@router.post("/", response_model=Token)
async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends()):
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
"""
Authenticates a user and provides an access token.
:param form_data: OAuth2PasswordRequestForm object containing the user's credentials.
:raises HTTPException: If the user cannot be authenticated.
:return: A Token object containing the access token and the token type.
"""
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
@ -19,4 +27,4 @@ async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(
)
access_token = create_access_token(data={"sub": user.username})
return {"access_token": access_token, "token_type": "bearer"}
return Token(access_token=access_token, token_type="bearer")

View File

@ -11,6 +11,7 @@ class JobType(Enum):
TEMPORARY = "temporary"
PER_DIEM = "per_diem"
NIGHTS = "nights"
OTHER = "other"
class Location(BaseModel):
@ -61,9 +62,5 @@ class JobResponse(BaseModel):
success: bool
error: str = None
total_pages: int = None
job_count: int = None
page: int = None
jobs: list[JobPost] = []

View File

@ -1,6 +1,6 @@
from pydantic import BaseModel
from enum import Enum
from ..jobs import JobResponse, JobPost
from ..jobs import JobResponse
class Site(Enum):
@ -16,11 +16,12 @@ class ScraperInput(BaseModel):
location: str
distance: int = 25
results_wanted: int = 15 #: TODO: implement
results_wanted: int = 15
class Scraper: #: to be used as a child class
class Scraper:
def __init__(self, site: Site):
self.site = site
def scrape(self, scraper_input: ScraperInput) -> JobResponse: ...
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
...

View File

@ -1,10 +1,11 @@
import re
import json
from math import ceil
from typing import Optional
import tls_client
from bs4 import BeautifulSoup
from fastapi import HTTPException, status
from bs4.element import Tag
from fastapi import status
from api.core.jobs import *
from api.core.scrapers import Scraper, ScraperInput, Site
@ -16,24 +17,38 @@ class ParsingException(Exception):
class IndeedScraper(Scraper):
def __init__(self):
"""
Initializes IndeedScraper with the Indeed job search url
"""
site = Site(Site.INDEED)
super().__init__(site)
self.url = "https://www.indeed.com/jobs"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes Indeed for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
session = tls_client.Session(
client_identifier="chrome112", random_tls_extension_order=True
)
job_list: list[JobPost] = []
page = 0
processed_jobs, total_num_jobs = 0, 0
seen_urls = set()
while len(job_list) < scraper_input.results_wanted:
params = {
"q": scraper_input.search_term,
"l": scraper_input.location,
"filter": 0,
"start": 0,
"start": 0 + page * 10,
"radius": scraper_input.distance,
}
response = session.get(self.url, params=params)
if response.status_code != status.HTTP_200_OK:
return JobResponse(
success=False,
@ -51,24 +66,30 @@ class IndeedScraper(Scraper):
)
total_num_jobs = IndeedScraper.total_jobs(soup)
total_pages = ceil(total_num_jobs / 15)
job_list: list[JobPost] = []
if not jobs.get('metaData', {}).get("mosaicProviderJobCardsModel", {}).get("results"):
if (
not jobs.get("metaData", {})
.get("mosaicProviderJobCardsModel", {})
.get("results")
):
return JobResponse(
success=False,
error="No jobs found",
)
page_number = jobs["metaData"]["mosaicProviderJobCardsModel"]["pageNumber"]
for job in jobs["metaData"]["mosaicProviderJobCardsModel"]["results"]:
job_url = job["thirdPartyApplyUrl"]
if job_url in seen_urls:
continue
snippet_html = BeautifulSoup(job["snippet"], "html.parser")
extracted_salary = job.get("extractedSalary")
compensation = None
if extracted_salary:
salary_snippet = job.get("salarySnippet")
currency = salary_snippet.get("currency") if salary_snippet else None
currency = (
salary_snippet.get("currency") if salary_snippet else None
)
interval = (extracted_salary.get("type"),)
if isinstance(interval, tuple):
interval = interval[0]
@ -84,9 +105,7 @@ class IndeedScraper(Scraper):
job_type = IndeedScraper.get_job_type(job)
if job.get("thirdPartyApplyUrl"):
delivery = Delivery(
method=DeliveryEnum.URL, value=job["thirdPartyApplyUrl"]
)
delivery = Delivery(method=DeliveryEnum.URL, value=job_url)
else:
delivery = None
timestamp_seconds = job["pubDate"] / 1000
@ -109,19 +128,32 @@ class IndeedScraper(Scraper):
delivery=delivery,
)
job_list.append(job_post)
if len(job_list) >= scraper_input.results_wanted:
break
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= total_num_jobs
):
break
page += 1
job_list = job_list[: scraper_input.results_wanted]
job_response = JobResponse(
success=True,
jobs=job_list,
job_count=total_num_jobs,
page=page_number,
total_pages=total_pages,
)
return job_response
@staticmethod
def get_job_type(data):
for taxonomy in data["taxonomyAttributes"]:
def get_job_type(job: dict) -> Optional[JobType]:
"""
Parses the job to get JobType
:param job:
:return:
"""
for taxonomy in job["taxonomyAttributes"]:
if taxonomy["label"] == "job-types":
if len(taxonomy["attributes"]) > 0:
job_type_str = (
@ -137,19 +169,31 @@ class IndeedScraper(Scraper):
def parse_jobs(soup: BeautifulSoup) -> dict:
"""
Parses the jobs from the soup object
:param soup:
:return: jobs
"""
script_tag = IndeedScraper.find_mosaic_script(soup)
def find_mosaic_script() -> Optional[Tag]:
"""
Finds jobcards script tag
:return: script_tag
"""
script_tags = soup.find_all("script")
for tag in script_tags:
if (
tag.string
and "mosaic.providerData" in tag.string
and "mosaic-provider-jobcards" in tag.string
):
return tag
return None
script_tag = find_mosaic_script()
if script_tag:
script_str = script_tag.string
pattern = r'window.mosaic.providerData\["mosaic-provider-jobcards"\]\s*=\s*({.*?});'
p = re.compile(pattern, re.DOTALL)
m = p.search(script_str)
if m:
jobs = json.loads(m.group(1).strip())
@ -157,10 +201,17 @@ class IndeedScraper(Scraper):
else:
raise ParsingException("Could not find mosaic provider job cards data")
else:
raise ParsingException("Could not find a script tag containing mosaic provider data")
raise ParsingException(
"Could not find a script tag containing mosaic provider data"
)
@staticmethod
def total_jobs(soup):
def total_jobs(soup: BeautifulSoup) -> int:
"""
Parses the total jobs for that search from soup object
:param soup:
:return: total_num_jobs
"""
script = soup.find("script", string=lambda t: "window._initialData" in t)
pattern = re.compile(r"window._initialData\s*=\s*({.*})\s*;", re.DOTALL)
@ -169,17 +220,5 @@ class IndeedScraper(Scraper):
if match:
json_str = match.group(1)
data = json.loads(json_str)
total_num_jobs = data["searchTitleBarModel"]["totalNumResults"]
total_num_jobs = int(data["searchTitleBarModel"]["totalNumResults"])
return total_num_jobs
@staticmethod
def find_mosaic_script(soup):
script_tags = soup.find_all("script")
for script_tag in script_tags:
if (
script_tag.string
and "mosaic.providerData" in script_tag.string
and "mosaic-provider-jobcards" in script_tag.string
):
return script_tag
return None

View File

@ -1,8 +1,9 @@
from math import ceil
from typing import Optional
import requests
from bs4 import BeautifulSoup
from fastapi import HTTPException, status
from bs4.element import Tag
from fastapi import status
from api.core.scrapers import Scraper, ScraperInput, Site
from api.core.jobs import *
@ -10,22 +11,34 @@ from api.core.jobs import *
class LinkedInScraper(Scraper):
def __init__(self):
"""
Initializes LinkedInScraper with the LinkedIn job search url
"""
site = Site(Site.LINKEDIN)
super().__init__(site)
self.url = "https://www.linkedin.com/jobs"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
current_page = 0
"""
Scrapes LinkedIn for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
job_list: list[JobPost] = []
seen_urls = set()
page, processed_jobs, job_count = 0, 0, 0
with requests.Session() as session:
while len(job_list) < scraper_input.results_wanted:
params = {
"pageNum": current_page,
"pageNum": page,
"location": scraper_input.location,
"distance": scraper_input.distance,
}
self.url = f"{self.url}/{scraper_input.search_term}-jobs"
response = requests.get(self.url, params=params)
response = session.get(self.url, params=params, allow_redirects=True)
if response.status_code != status.HTTP_200_OK:
return JobResponse(
@ -35,14 +48,21 @@ class LinkedInScraper(Scraper):
soup = BeautifulSoup(response.text, "html.parser")
job_list: list[JobPost] = []
if page == 0:
job_count_text = soup.find(
"span", class_="results-context-header__job-count"
).text
job_count = int("".join(filter(str.isdigit, job_count_text)))
for job_card in soup.find_all(
"div",
class_="base-card relative w-full hover:no-underline focus:no-underline base-card--link base-search-card base-search-card--link job-search-card",
):
job_url_tag = job_card.find("a", class_="base-card__full-link")
job_url = job_url_tag["href"] if job_url_tag else "N/A"
if job_url in seen_urls:
continue
seen_urls.add(job_url)
job_info = job_card.find("div", class_="base-search-card__info")
if job_info is None:
continue
@ -52,7 +72,9 @@ class LinkedInScraper(Scraper):
company_tag = job_info.find("a", class_="hidden-nested-link")
company = company_tag.text.strip() if company_tag else "N/A"
metadata_card = job_info.find("div", class_="base-search-card__metadata")
metadata_card = job_info.find(
"div", class_="base-search-card__metadata"
)
location: Location = LinkedInScraper.get_location(metadata_card)
datetime_tag = metadata_card.find(
@ -72,24 +94,31 @@ class LinkedInScraper(Scraper):
delivery=Delivery(method=DeliveryEnum.URL, value=job_url),
)
job_list.append(job_post)
if len(job_list) >= scraper_input.results_wanted:
break
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= job_count
):
break
job_count_text = soup.find(
"span", class_="results-context-header__job-count"
).text
job_count = int("".join(filter(str.isdigit, job_count_text)))
total_pages = ceil(job_count / 25)
page += 1
job_list = job_list[: scraper_input.results_wanted]
job_response = JobResponse(
success=True,
jobs=job_list,
job_count=job_count,
page=current_page + 1,
total_pages=total_pages,
)
return job_response
@staticmethod
def get_location(metadata_card):
def get_location(metadata_card: Optional[Tag]) -> Location:
"""
Extracts the location data from the job metadata card.
:param metadata_card
:return: location
"""
location = Location(
country="US",
)

View File

@ -1,8 +1,9 @@
import json
from typing import Optional
from urllib.parse import urlparse, parse_qs
import tls_client
from fastapi import HTTPException, status
from fastapi import status
from bs4 import BeautifulSoup
from api.core.scrapers import Scraper, ScraperInput, Site
@ -11,22 +12,33 @@ from api.core.jobs import *
class ZipRecruiterScraper(Scraper):
def __init__(self):
"""
Initializes LinkedInScraper with the ZipRecruiter job search url
"""
site = Site(Site.ZIP_RECRUITER)
super().__init__(site)
self.url = "https://www.ziprecruiter.com/jobs-search"
def scrape(self, scraper_input: ScraperInput) -> JobResponse:
"""
Scrapes ZipRecruiter for jobs with scraper_input criteria
:param scraper_input:
:return: job_response
"""
session = tls_client.Session(
client_identifier="chrome112", random_tls_extension_order=True
)
current_page = 1
job_list: list[JobPost] = []
page = 1
processed_jobs, job_count = 0, 0
seen_urls = set()
while len(job_list) < scraper_input.results_wanted:
params = {
"search": scraper_input.search_term,
"location": scraper_input.location,
"page": min(current_page, 10),
"page": page,
"radius": scraper_input.distance,
}
@ -41,11 +53,20 @@ class ZipRecruiterScraper(Scraper):
html_string = response.content
soup = BeautifulSoup(html_string, "html.parser")
if page == 1:
script_tag = soup.find("script", {"id": "js_variables"})
data = json.loads(script_tag.string)
job_count = data["totalJobCount"]
job_count = int(job_count.replace(",", ""))
job_posts = soup.find_all("div", {"class": "job_content"})
job_list: list[JobPost] = []
for job in job_posts:
processed_jobs += 1
job_url = job.find("a", {"class": "job_link"})["href"]
if job_url in seen_urls:
continue
title = job.find("h2", {"class": "title"}).text
company = job.find("a", {"class": "company_name"}).text.strip()
description = job.find("p", {"class": "job_snippet"}).text.strip()
@ -56,7 +77,6 @@ class ZipRecruiterScraper(Scraper):
else None
)
url = job.find("a", {"class": "job_link"})["href"]
date_posted = ZipRecruiterScraper.get_date_posted(job)
job_type = job_type.replace(" ", "_") if job_type else job_type
@ -68,30 +88,35 @@ class ZipRecruiterScraper(Scraper):
job_type=job_type,
compensation=ZipRecruiterScraper.get_compensation(job),
date_posted=date_posted,
delivery=Delivery(method=DeliveryEnum.URL, value=url),
delivery=Delivery(method=DeliveryEnum.URL, value=job_url),
)
job_list.append(job_post)
if len(job_list) > 20:
if len(job_list) >= scraper_input.results_wanted:
break
script_tag = soup.find("script", {"id": "js_variables"})
if (
len(job_list) >= scraper_input.results_wanted
or processed_jobs >= job_count
):
break
data = json.loads(script_tag.string)
page += 1
job_count = data["totalJobCount"]
job_count = job_count.replace(",", "")
total_pages = data["maxPages"]
job_list = job_list[: scraper_input.results_wanted]
job_response = JobResponse(
success=True,
jobs=job_list,
job_count=job_count,
page=params["page"],
total_pages=total_pages,
)
return job_response
@staticmethod
def get_interval(interval_str):
def get_interval(interval_str: str):
"""
Maps the interval alias to its appropriate CompensationInterval.
:param interval_str
:return: CompensationInterval
"""
interval_alias = {"annually": CompensationInterval.YEARLY}
interval_str = interval_str.lower()
@ -101,7 +126,12 @@ class ZipRecruiterScraper(Scraper):
return CompensationInterval(interval_str)
@staticmethod
def get_date_posted(job: BeautifulSoup):
def get_date_posted(job: BeautifulSoup) -> Optional[str]:
"""
Extracts the date a job was posted
:param job
:return: date the job was posted or None
"""
button = job.find(
"button", {"class": "action_input save_job zrs_btn_secondary_200"}
)
@ -111,27 +141,23 @@ class ZipRecruiterScraper(Scraper):
return params.get("posted_time", [None])[0]
@staticmethod
def get_compensation(job: BeautifulSoup):
def get_compensation(job: BeautifulSoup) -> Optional[Compensation]:
"""
Parses the compensation tag from the job BeautifulSoup object
:param job
:return: Compensation object or None
"""
pay_element = job.find("li", {"class": "perk_item perk_pay"})
if pay_element is None:
return None
pay = pay_element.find("div", {"class": "value"}).find("span").text.strip()
return ZipRecruiterScraper.create_compensation_object(pay)
@staticmethod
def get_location(job: BeautifulSoup):
location_string = job.find("a", {"class": "company_location"}).text.strip()
parts = location_string.split(", ")
city, state = parts
return Location(
country="US",
city=city,
state=state,
)
@staticmethod
def create_compensation_object(pay_string: str):
def create_compensation_object(pay_string: str) -> Compensation:
"""
Creates a Compensation object from a pay_string
:param pay_string
:return: compensation
"""
interval = ZipRecruiterScraper.get_interval(pay_string.split()[-1])
amounts = []
@ -150,8 +176,30 @@ class ZipRecruiterScraper(Scraper):
return compensation
return create_compensation_object(pay)
@staticmethod
def headers():
def get_location(job: BeautifulSoup) -> Location:
"""
Extracts the job location from BeatifulSoup object
:param job:
:return: location
"""
location_string = job.find("a", {"class": "company_location"}).text.strip()
parts = location_string.split(", ")
city, state = parts
return Location(
country="US",
city=city,
state=state,
)
@staticmethod
def headers() -> dict:
"""
Returns headers needed for requests
:return: dict - Dictionary containing headers
"""
return {
"User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_14_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/78.0.3904.97 Safari/537.36"
}

View File

@ -1,4 +1,4 @@
from fastapi import APIRouter, Depends
from fastapi import APIRouter
from api.core.scrapers.indeed import IndeedScraper
from api.core.scrapers.ziprecruiter import ZipRecruiterScraper
@ -15,9 +15,7 @@ SCRAPER_MAPPING = {
@router.post("/", response_model=JobResponse)
async def scrape_jobs(
scraper_input: ScraperInput
):
async def scrape_jobs(scraper_input: ScraperInput):
scraper_class = SCRAPER_MAPPING[scraper_input.site_type]
scraper = scraper_class()

View File

@ -6,4 +6,4 @@ SUPABASE_URL = os.environ.get("SUPABASE_URL")
SUPABASE_KEY = os.environ.get("SUPABASE_KEY")
JWT_SECRET_KEY = os.environ.get("JWT_SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
ACCESS_TOKEN_EXPIRE_MINUTES = 120