format: Apply Black formatter to the codebase (#127)

This commit is contained in:
VitaminB16
2024-03-11 04:36:27 +00:00
committed by GitHub
parent e8b4b376b8
commit 94d8f555fd
11 changed files with 524 additions and 208 deletions

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
import pandas as pd
from typing import Tuple
from concurrent.futures import ThreadPoolExecutor, as_completed
@@ -70,6 +72,7 @@ def scrape_jobs(
for site in site_name
]
return site_types
country_enum = Country.from_string(country_indeed)
scraper_input = ScraperInput(
@@ -86,14 +89,15 @@ def scrape_jobs(
results_wanted=results_wanted,
linkedin_company_ids=linkedin_company_ids,
offset=offset,
hours_old=hours_old
hours_old=hours_old,
)
def scrape_site(site: Site) -> Tuple[str, JobResponse]:
scraper_class = SCRAPER_MAPPING[site]
scraper = scraper_class(proxy=proxy)
scraped_data: JobResponse = scraper.scrape(scraper_input)
site_name = 'ZipRecruiter' if site.value.capitalize() == 'Zip_recruiter' else site.value.capitalize()
cap_name = site.value.capitalize()
site_name = "ZipRecruiter" if cap_name == "Zip_recruiter" else cap_name
logger.info(f"{site_name} finished scraping")
return site.value, scraped_data
@@ -117,9 +121,8 @@ def scrape_jobs(
for site, job_response in site_to_jobs_dict.items():
for job in job_response.jobs:
job_data = job.dict()
job_data[
"job_url_hyper"
] = f'<a href="{job_data["job_url"]}">{job_data["job_url"]}</a>'
job_url = job_data["job_url"]
job_data["job_url_hyper"] = f'<a href="{job_url}">{job_url}</a>'
job_data["site"] = site
job_data["company"] = job_data["company_name"]
job_data["job_type"] = (
@@ -156,11 +159,11 @@ def scrape_jobs(
if jobs_dfs:
# Step 1: Filter out all-NA columns from each DataFrame before concatenation
filtered_dfs = [df.dropna(axis=1, how='all') for df in jobs_dfs]
filtered_dfs = [df.dropna(axis=1, how="all") for df in jobs_dfs]
# Step 2: Concatenate the filtered DataFrames
jobs_df = pd.concat(filtered_dfs, ignore_index=True)
# Desired column order
desired_order = [
"site",
@@ -178,7 +181,6 @@ def scrape_jobs(
"is_remote",
"emails",
"description",
"company_url",
"company_url_direct",
"company_addresses",
@@ -191,16 +193,16 @@ def scrape_jobs(
"ceo_name",
"ceo_photo_url",
]
# Step 3: Ensure all desired columns are present, adding missing ones as empty
for column in desired_order:
if column not in jobs_df.columns:
jobs_df[column] = None # Add missing columns as empty
# Reorder the DataFrame according to the desired order
jobs_df = jobs_df[desired_order]
# Step 4: Sort the DataFrame as required
return jobs_df.sort_values(by=['site', 'date_posted'], ascending=[True, False])
return jobs_df.sort_values(by=["site", "date_posted"], ascending=[True, False])
else:
return pd.DataFrame()

View File

@@ -1,3 +1,5 @@
from __future__ import annotations
from typing import Optional
from datetime import date
from enum import Enum
@@ -156,7 +158,7 @@ class Country(Enum):
"""Convert a string to the corresponding Country enum."""
country_str = country_str.strip().lower()
for country in cls:
country_names = country.value[0].split(',')
country_names = country.value[0].split(",")
if country_str in country_names:
return country
valid_countries = [country.value for country in cls]
@@ -178,7 +180,10 @@ class Location(BaseModel):
location_parts.append(self.state)
if isinstance(self.country, str):
location_parts.append(self.country)
elif self.country and self.country not in (Country.US_CANADA, Country.WORLDWIDE):
elif self.country and self.country not in (
Country.US_CANADA,
Country.WORLDWIDE,
):
country_name = self.country.value[0]
if "," in country_name:
country_name = country_name.split(",")[0]

View File

@@ -1,10 +1,12 @@
from __future__ import annotations
from ..jobs import (
Enum,
BaseModel,
JobType,
JobResponse,
Country,
DescriptionFormat
DescriptionFormat,
)

View File

@@ -4,21 +4,23 @@ jobspy.scrapers.glassdoor
This module contains routines to scrape Glassdoor.
"""
import json
import re
from __future__ import annotations
import re
import json
import requests
from typing import Optional
from typing import Optional, Tuple
from datetime import datetime, timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed
from ..utils import extract_emails_from_text
from .. import Scraper, ScraperInput, Site
from ..utils import extract_emails_from_text
from ..exceptions import GlassdoorException
from ..utils import (
create_session,
markdown_converter,
logger
logger,
)
from ...jobs import (
JobPost,
@@ -27,7 +29,7 @@ from ...jobs import (
Location,
JobResponse,
JobType,
DescriptionFormat
DescriptionFormat,
)
@@ -59,25 +61,22 @@ class GlassdoorScraper(Scraper):
self.session = create_session(self.proxy, is_tls=True, has_retry=True)
token = self._get_csrf_token()
self.headers['gd-csrf-token'] = token if token else self.fallback_token
self.headers["gd-csrf-token"] = token if token else self.fallback_token
location_id, location_type = self._get_location(
scraper_input.location, scraper_input.is_remote
)
if location_type is None:
logger.error('Glassdoor: location not parsed')
logger.error("Glassdoor: location not parsed")
return JobResponse(jobs=[])
all_jobs: list[JobPost] = []
cursor = None
for page in range(
1 + (scraper_input.offset // self.jobs_per_page),
min(
(scraper_input.results_wanted // self.jobs_per_page) + 2,
self.max_pages + 1,
),
):
logger.info(f'Glassdoor search page: {page}')
range_start = 1 + (scraper_input.offset // self.jobs_per_page)
tot_pages = (scraper_input.results_wanted // self.jobs_per_page) + 2
range_end = min(tot_pages, self.max_pages + 1)
for page in range(range_start, range_end):
logger.info(f"Glassdoor search page: {page}")
try:
jobs, cursor = self._fetch_jobs_page(
scraper_input, location_id, location_type, page, cursor
@@ -87,7 +86,7 @@ class GlassdoorScraper(Scraper):
all_jobs = all_jobs[: scraper_input.results_wanted]
break
except Exception as e:
logger.error(f'Glassdoor: {str(e)}')
logger.error(f"Glassdoor: {str(e)}")
break
return JobResponse(jobs=all_jobs)
@@ -98,39 +97,48 @@ class GlassdoorScraper(Scraper):
location_type: str,
page_num: int,
cursor: str | None,
) -> (list[JobPost], str | None):
) -> Tuple[list[JobPost], str | None]:
"""
Scrapes a page of Glassdoor for jobs with scraper_input criteria
"""
jobs = []
self.scraper_input = scraper_input
try:
payload = self._add_payload(
location_id, location_type, page_num, cursor
)
payload = self._add_payload(location_id, location_type, page_num, cursor)
response = self.session.post(
f"{self.base_url}/graph", headers=self.headers, timeout_seconds=15, data=payload
f"{self.base_url}/graph",
headers=self.headers,
timeout_seconds=15,
data=payload,
)
if response.status_code != 200:
raise GlassdoorException(f"bad response status code: {response.status_code}")
exc_msg = f"bad response status code: {response.status_code}"
raise GlassdoorException(exc_msg)
res_json = response.json()[0]
if "errors" in res_json:
raise ValueError("Error encountered in API response")
except (requests.exceptions.ReadTimeout, GlassdoorException, ValueError, Exception) as e:
logger.error(f'Glassdoor: {str(e)}')
except (
requests.exceptions.ReadTimeout,
GlassdoorException,
ValueError,
Exception,
) as e:
logger.error(f"Glassdoor: {str(e)}")
return jobs, None
jobs_data = res_json["data"]["jobListings"]["jobListings"]
with ThreadPoolExecutor(max_workers=self.jobs_per_page) as executor:
future_to_job_data = {executor.submit(self._process_job, job): job for job in jobs_data}
future_to_job_data = {
executor.submit(self._process_job, job): job for job in jobs_data
}
for future in as_completed(future_to_job_data):
try:
job_post = future.result()
if job_post:
jobs.append(job_post)
except Exception as exc:
raise GlassdoorException(f'Glassdoor generated an exception: {exc}')
raise GlassdoorException(f"Glassdoor generated an exception: {exc}")
return jobs, self.get_cursor_for_page(
res_json["data"]["jobListings"]["paginationCursors"], page_num + 1
@@ -140,7 +148,9 @@ class GlassdoorScraper(Scraper):
"""
Fetches csrf token needed for API by visiting a generic page
"""
res = self.session.get(f'{self.base_url}/Job/computer-science-jobs.htm', headers=self.headers)
res = self.session.get(
f"{self.base_url}/Job/computer-science-jobs.htm", headers=self.headers
)
pattern = r'"token":\s*"([^"]+)"'
matches = re.findall(pattern, res.text)
token = None
@@ -153,19 +163,20 @@ class GlassdoorScraper(Scraper):
Processes a single job and fetches its description.
"""
job_id = job_data["jobview"]["job"]["listingId"]
job_url = f'{self.base_url}job-listing/j?jl={job_id}'
job_url = f"{self.base_url}job-listing/j?jl={job_id}"
if job_url in self.seen_urls:
return None
self.seen_urls.add(job_url)
job = job_data["jobview"]
title = job["job"]["jobTitleText"]
company_name = job["header"]["employerNameFromSearch"]
company_id = job_data['jobview']['header']['employer']['id']
company_id = job_data["jobview"]["header"]["employer"]["id"]
location_name = job["header"].get("locationName", "")
location_type = job["header"].get("locationType", "")
age_in_days = job["header"].get("ageInDays")
is_remote, location = False, None
date_posted = (datetime.now() - timedelta(days=age_in_days)).date() if age_in_days is not None else None
date_diff = (datetime.now() - timedelta(days=age_in_days)).date()
date_posted = date_diff if age_in_days is not None else None
if location_type == "S":
is_remote = True
@@ -177,9 +188,10 @@ class GlassdoorScraper(Scraper):
description = self._fetch_job_description(job_id)
except:
description = None
company_url = f"{self.base_url}Overview/W-EI_IE{company_id}.htm"
return JobPost(
title=title,
company_url=f"{self.base_url}Overview/W-EI_IE{company_id}.htm" if company_id else None,
company_url=company_url if company_id else None,
company_name=company_name,
date_posted=date_posted,
job_url=job_url,
@@ -201,7 +213,7 @@ class GlassdoorScraper(Scraper):
"variables": {
"jl": job_id,
"queryString": "q",
"pageTypeEnum": "SERP"
"pageTypeEnum": "SERP",
},
"query": """
query JobDetailQuery($jl: Long!, $queryString: String, $pageTypeEnum: PageTypeEnum) {
@@ -216,15 +228,17 @@ class GlassdoorScraper(Scraper):
__typename
}
}
"""
""",
}
]
res = requests.post(url, json=body, headers=self.headers)
if res.status_code != 200:
return None
data = res.json()[0]
desc = data['data']['jobview']['job']['description']
return markdown_converter(desc) if self.scraper_input.description_format == DescriptionFormat.MARKDOWN else desc
desc = data["data"]["jobview"]["job"]["description"]
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
desc = markdown_converter(desc)
return desc
def _get_location(self, location: str, is_remote: bool) -> (int, str):
if not location or is_remote:
@@ -234,10 +248,13 @@ class GlassdoorScraper(Scraper):
res = self.session.get(url, headers=self.headers)
if res.status_code != 200:
if res.status_code == 429:
logger.error(f'429 Response - Blocked by Glassdoor for too many requests')
err = f"429 Response - Blocked by Glassdoor for too many requests"
logger.error(err)
return None, None
else:
logger.error(f'Glassdoor response status code {res.status_code}')
err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}"
logger.error(f"Glassdoor response status code {res.status_code}")
return None, None
items = res.json()
@@ -248,7 +265,7 @@ class GlassdoorScraper(Scraper):
location_type = "CITY"
elif location_type == "S":
location_type = "STATE"
elif location_type == 'N':
elif location_type == "N":
location_type = "COUNTRY"
return int(items[0]["locationId"]), location_type
@@ -259,7 +276,9 @@ class GlassdoorScraper(Scraper):
page_num: int,
cursor: str | None = None,
) -> str:
fromage = max(self.scraper_input.hours_old // 24, 1) if self.scraper_input.hours_old else None
fromage = None
if self.scraper_input.hours_old:
fromage = max(self.scraper_input.hours_old // 24, 1)
filter_params = []
if self.scraper_input.easy_apply:
filter_params.append({"filterKey": "applicationType", "values": "1"})
@@ -278,9 +297,9 @@ class GlassdoorScraper(Scraper):
"pageNumber": page_num,
"pageCursor": cursor,
"fromage": fromage,
"sort": "date"
"sort": "date",
},
"query": self.query_template
"query": self.query_template,
}
if self.scraper_input.job_type:
payload["variables"]["filterParams"].append(
@@ -512,4 +531,4 @@ class GlassdoorScraper(Scraper):
}
__typename
}
"""
"""

View File

@@ -4,9 +4,13 @@ jobspy.scrapers.indeed
This module contains routines to scrape Indeed.
"""
from __future__ import annotations
import math
from concurrent.futures import ThreadPoolExecutor, Future
from typing import Tuple
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, Future
import requests
@@ -15,7 +19,7 @@ from ..utils import (
extract_emails_from_text,
get_enum_from_job_type,
markdown_converter,
logger
logger,
)
from ...jobs import (
JobPost,
@@ -24,7 +28,7 @@ from ...jobs import (
Location,
JobResponse,
JobType,
DescriptionFormat
DescriptionFormat,
)
@@ -54,30 +58,30 @@ class IndeedScraper(Scraper):
domain, self.api_country_code = self.scraper_input.country.indeed_domain_value
self.base_url = f"https://{domain}.indeed.com"
self.headers = self.api_headers.copy()
self.headers['indeed-co'] = self.scraper_input.country.indeed_domain_value
self.headers["indeed-co"] = self.scraper_input.country.indeed_domain_value
job_list = []
page = 1
cursor = None
offset_pages = math.ceil(self.scraper_input.offset / 100)
for _ in range(offset_pages):
logger.info(f'Indeed skipping search page: {page}')
logger.info(f"Indeed skipping search page: {page}")
__, cursor = self._scrape_page(cursor)
if not __:
logger.info(f'Indeed found no jobs on page: {page}')
logger.info(f"Indeed found no jobs on page: {page}")
break
while len(self.seen_urls) < scraper_input.results_wanted:
logger.info(f'Indeed search page: {page}')
logger.info(f"Indeed search page: {page}")
jobs, cursor = self._scrape_page(cursor)
if not jobs:
logger.info(f'Indeed found no jobs on page: {page}')
logger.info(f"Indeed found no jobs on page: {page}")
break
job_list += jobs
page += 1
return JobResponse(jobs=job_list[:scraper_input.results_wanted])
return JobResponse(jobs=job_list[: scraper_input.results_wanted])
def _scrape_page(self, cursor: str | None) -> (list[JobPost], str | None):
def _scrape_page(self, cursor: str | None) -> Tuple[list[JobPost], str | None]:
"""
Scrapes a page of Indeed for jobs with scraper_input criteria
:param cursor:
@@ -86,31 +90,43 @@ class IndeedScraper(Scraper):
jobs = []
new_cursor = None
filters = self._build_filters()
location = (
self.scraper_input.location
or self.scraper_input.country.value[0].split(",")[-1]
)
query = self.job_search_query.format(
what=self.scraper_input.search_term,
location=self.scraper_input.location if self.scraper_input.location else self.scraper_input.country.value[0].split(',')[-1],
location=location,
radius=self.scraper_input.distance,
dateOnIndeed=self.scraper_input.hours_old,
cursor=f'cursor: "{cursor}"' if cursor else '',
filters=filters
cursor=f'cursor: "{cursor}"' if cursor else "",
filters=filters,
)
payload = {
'query': query,
"query": query,
}
api_headers = self.api_headers.copy()
api_headers['indeed-co'] = self.api_country_code
response = requests.post(self.api_url, headers=api_headers, json=payload, proxies=self.proxy, timeout=10)
api_headers["indeed-co"] = self.api_country_code
response = requests.post(
self.api_url,
headers=api_headers,
json=payload,
proxies=self.proxy,
timeout=10,
)
if response.status_code != 200:
logger.info(f'Indeed responded with status code: {response.status_code} (submit GitHub issue if this appears to be a beg)')
logger.info(
f"Indeed responded with status code: {response.status_code} (submit GitHub issue if this appears to be a beg)"
)
return jobs, new_cursor
data = response.json()
jobs = data['data']['jobSearch']['results']
new_cursor = data['data']['jobSearch']['pageInfo']['nextCursor']
jobs = data["data"]["jobSearch"]["results"]
new_cursor = data["data"]["jobSearch"]["pageInfo"]["nextCursor"]
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
job_results: list[Future] = [
executor.submit(self._process_job, job['job']) for job in jobs
]
executor.submit(self._process_job, job["job"]) for job in jobs
]
job_list = [result.result() for result in job_results if result.result()]
return job_list, new_cursor
@@ -128,7 +144,9 @@ class IndeedScraper(Scraper):
start: "{start}h"
}}
}}
""".format(start=self.scraper_input.hours_old)
""".format(
start=self.scraper_input.hours_old
)
elif self.scraper_input.job_type or self.scraper_input.is_remote:
job_type_key_mapping = {
JobType.FULL_TIME: "CF3CP",
@@ -171,22 +189,24 @@ class IndeedScraper(Scraper):
if job_url in self.seen_urls:
return
self.seen_urls.add(job_url)
description = job['description']['html']
description = markdown_converter(description) if self.scraper_input.description_format == DescriptionFormat.MARKDOWN else description
description = job["description"]["html"]
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
description = markdown_converter(description)
job_type = self._get_job_type(job['attributes'])
job_type = self._get_job_type(job["attributes"])
timestamp_seconds = job["datePublished"] / 1000
date_posted = datetime.fromtimestamp(timestamp_seconds).strftime("%Y-%m-%d")
employer = job['employer'].get('dossier') if job['employer'] else None
employer_details = employer.get('employerDetails', {}) if employer else {}
employer = job["employer"].get("dossier") if job["employer"] else None
employer_details = employer.get("employerDetails", {}) if employer else {}
rel_url = job["employer"]["relativeCompanyPageUrl"] if job["employer"] else None
return JobPost(
title=job["title"],
description=description,
company_name=job['employer'].get("name") if job.get('employer') else None,
company_url=f"{self.base_url}{job['employer']['relativeCompanyPageUrl']}" if job[
'employer'] else None,
company_url_direct=employer['links']['corporateWebsite'] if employer else None,
company_name=job["employer"].get("name") if job.get("employer") else None,
company_url=(f"{self.base_url}{rel_url}" if job["employer"] else None),
company_url_direct=(
employer["links"]["corporateWebsite"] if employer else None
),
location=Location(
city=job.get("location", {}).get("city"),
state=job.get("location", {}).get("admin1Code"),
@@ -196,20 +216,39 @@ class IndeedScraper(Scraper):
compensation=self._get_compensation(job),
date_posted=date_posted,
job_url=job_url,
job_url_direct=job['recruit'].get('viewJobUrl') if job.get('recruit') else None,
job_url_direct=(
job["recruit"].get("viewJobUrl") if job.get("recruit") else None
),
emails=extract_emails_from_text(description) if description else None,
is_remote=self._is_job_remote(job, description),
company_addresses=employer_details['addresses'][0] if employer_details.get('addresses') else None,
company_industry=employer_details['industry'].replace('Iv1', '').replace('_', ' ').title() if employer_details.get('industry') else None,
company_num_employees=employer_details.get('employeesLocalizedLabel'),
company_revenue=employer_details.get('revenueLocalizedLabel'),
company_description=employer_details.get('briefDescription'),
ceo_name=employer_details.get('ceoName'),
ceo_photo_url=employer_details.get('ceoPhotoUrl'),
logo_photo_url=employer['images'].get('squareLogoUrl') if employer and employer.get('images') else None,
banner_photo_url=employer['images'].get('headerImageUrl') if employer and employer.get('images') else None,
company_addresses=(
employer_details["addresses"][0]
if employer_details.get("addresses")
else None
),
company_industry=(
employer_details["industry"]
.replace("Iv1", "")
.replace("_", " ")
.title()
if employer_details.get("industry")
else None
),
company_num_employees=employer_details.get("employeesLocalizedLabel"),
company_revenue=employer_details.get("revenueLocalizedLabel"),
company_description=employer_details.get("briefDescription"),
ceo_name=employer_details.get("ceoName"),
ceo_photo_url=employer_details.get("ceoPhotoUrl"),
logo_photo_url=(
employer["images"].get("squareLogoUrl")
if employer and employer.get("images")
else None
),
banner_photo_url=(
employer["images"].get("headerImageUrl")
if employer and employer.get("images")
else None
),
)
@staticmethod
@@ -221,7 +260,7 @@ class IndeedScraper(Scraper):
"""
job_types: list[JobType] = []
for attribute in attributes:
job_type_str = attribute['label'].replace("-", "").replace(" ", "").lower()
job_type_str = attribute["label"].replace("-", "").replace(" ", "").lower()
job_type = get_enum_from_job_type(job_type_str)
if job_type:
job_types.append(job_type)
@@ -235,33 +274,41 @@ class IndeedScraper(Scraper):
:param job:
:return: compensation object
"""
comp = job['compensation']['baseSalary']
if comp:
interval = IndeedScraper._get_compensation_interval(comp['unitOfWork'])
if interval:
return Compensation(
interval=interval,
min_amount=round(comp['range'].get('min'), 2) if comp['range'].get('min') is not None else None,
max_amount=round(comp['range'].get('max'), 2) if comp['range'].get('max') is not None else None,
currency=job['compensation']['currencyCode']
)
comp = job["compensation"]["baseSalary"]
if not comp:
return None
interval = IndeedScraper._get_compensation_interval(comp["unitOfWork"])
if not interval:
return None
min_range = comp["range"].get("min")
max_range = comp["range"].get("max")
return Compensation(
interval=interval,
min_amount=round(min_range, 2) if min_range is not None else None,
max_amount=round(max_range, 2) if max_range is not None else None,
currency=job["compensation"]["currencyCode"],
)
@staticmethod
def _is_job_remote(job: dict, description: str) -> bool:
"""
Searches the description, location, and attributes to check if job is remote
"""
remote_keywords = ['remote', 'work from home', 'wfh']
remote_keywords = ["remote", "work from home", "wfh"]
is_remote_in_attributes = any(
any(keyword in attr['label'].lower() for keyword in remote_keywords)
for attr in job['attributes']
any(keyword in attr["label"].lower() for keyword in remote_keywords)
for attr in job["attributes"]
)
is_remote_in_description = any(
keyword in description.lower() for keyword in remote_keywords
)
is_remote_in_description = any(keyword in description.lower() for keyword in remote_keywords)
is_remote_in_location = any(
keyword in job['location']['formatted']['long'].lower()
keyword in job["location"]["formatted"]["long"].lower()
for keyword in remote_keywords
)
return is_remote_in_attributes or is_remote_in_description or is_remote_in_location
return (
is_remote_in_attributes or is_remote_in_description or is_remote_in_location
)
@staticmethod
def _get_compensation_interval(interval: str) -> CompensationInterval:
@@ -270,7 +317,7 @@ class IndeedScraper(Scraper):
"YEAR": "YEARLY",
"HOUR": "HOURLY",
"WEEK": "WEEKLY",
"MONTH": "MONTHLY"
"MONTH": "MONTHLY",
}
mapped_interval = interval_mapping.get(interval.upper(), None)
if mapped_interval and mapped_interval in CompensationInterval.__members__:
@@ -279,14 +326,14 @@ class IndeedScraper(Scraper):
raise ValueError(f"Unsupported interval: {interval}")
api_headers = {
'Host': 'apis.indeed.com',
'content-type': 'application/json',
'indeed-api-key': '161092c2017b5bbab13edb12461a62d5a833871e7cad6d9d475304573de67ac8',
'accept': 'application/json',
'indeed-locale': 'en-US',
'accept-language': 'en-US,en;q=0.9',
'user-agent': 'Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 Indeed App 193.1',
'indeed-app-info': 'appv=193.1; appid=com.indeed.jobsearch; osv=16.6.1; os=ios; dtype=phone',
"Host": "apis.indeed.com",
"content-type": "application/json",
"indeed-api-key": "161092c2017b5bbab13edb12461a62d5a833871e7cad6d9d475304573de67ac8",
"accept": "application/json",
"indeed-locale": "en-US",
"accept-language": "en-US,en;q=0.9",
"user-agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 16_6_1 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Mobile/15E148 Indeed App 193.1",
"indeed-app-info": "appv=193.1; appid=com.indeed.jobsearch; osv=16.6.1; os=ios; dtype=phone",
}
job_search_query = """
query GetJobData {{

View File

@@ -4,6 +4,9 @@ jobspy.scrapers.linkedin
This module contains routines to scrape LinkedIn.
"""
from __future__ import annotations
import time
import random
from typing import Optional
@@ -24,14 +27,14 @@ from ...jobs import (
JobType,
Country,
Compensation,
DescriptionFormat
DescriptionFormat,
)
from ..utils import (
logger,
extract_emails_from_text,
get_enum_from_job_type,
currency_parser,
markdown_converter
markdown_converter,
)
@@ -61,26 +64,32 @@ class LinkedInScraper(Scraper):
url_lock = Lock()
page = scraper_input.offset // 25 + 25 if scraper_input.offset else 0
seconds_old = (
scraper_input.hours_old * 3600
if scraper_input.hours_old
else None
scraper_input.hours_old * 3600 if scraper_input.hours_old else None
)
continue_search = (
lambda: len(job_list) < scraper_input.results_wanted and page < 1000
)
continue_search = lambda: len(job_list) < scraper_input.results_wanted and page < 1000
while continue_search():
logger.info(f'LinkedIn search page: {page // 25 + 1}')
logger.info(f"LinkedIn search page: {page // 25 + 1}")
session = create_session(is_tls=False, has_retry=True, delay=5)
params = {
"keywords": scraper_input.search_term,
"location": scraper_input.location,
"distance": scraper_input.distance,
"f_WT": 2 if scraper_input.is_remote else None,
"f_JT": self.job_type_code(scraper_input.job_type)
if scraper_input.job_type
else None,
"f_JT": (
self.job_type_code(scraper_input.job_type)
if scraper_input.job_type
else None
),
"pageNum": 0,
"start": page + scraper_input.offset,
"f_AL": "true" if scraper_input.easy_apply else None,
"f_C": ','.join(map(str, scraper_input.linkedin_company_ids)) if scraper_input.linkedin_company_ids else None,
"f_C": (
",".join(map(str, scraper_input.linkedin_company_ids))
if scraper_input.linkedin_company_ids
else None
),
}
if seconds_old is not None:
params["f_TPR"] = f"r{seconds_old}"
@@ -97,15 +106,19 @@ class LinkedInScraper(Scraper):
)
if response.status_code not in range(200, 400):
if response.status_code == 429:
logger.error(f'429 Response - Blocked by LinkedIn for too many requests')
err = (
f"429 Response - Blocked by LinkedIn for too many requests"
)
else:
logger.error(f'LinkedIn response status code {response.status_code}')
err = f"LinkedIn response status code {response.status_code}"
err += f" - {response.text}"
logger.error(err)
return JobResponse(jobs=job_list)
except Exception as e:
if "Proxy responded with" in str(e):
logger.error(f'LinkedIn: Bad proxy')
logger.error(f"LinkedIn: Bad proxy")
else:
logger.error(f'LinkedIn: {str(e)}')
logger.error(f"LinkedIn: {str(e)}")
return JobResponse(jobs=job_list)
soup = BeautifulSoup(response.text, "html.parser")
@@ -126,11 +139,12 @@ class LinkedInScraper(Scraper):
continue
seen_urls.add(job_url)
try:
job_post = self._process_job(job_card, job_url, scraper_input.linkedin_fetch_description)
fetch_desc = scraper_input.linkedin_fetch_description
job_post = self._process_job(job_card, job_url, fetch_desc)
if job_post:
job_list.append(job_post)
if not continue_search():
break
break
except Exception as e:
raise LinkedInException(str(e))
@@ -141,8 +155,10 @@ class LinkedInScraper(Scraper):
job_list = job_list[: scraper_input.results_wanted]
return JobResponse(jobs=job_list)
def _process_job(self, job_card: Tag, job_url: str, full_descr: bool) -> Optional[JobPost]:
salary_tag = job_card.find('span', class_='job-search-card__salary-info')
def _process_job(
self, job_card: Tag, job_url: str, full_descr: bool
) -> Optional[JobPost]:
salary_tag = job_card.find("span", class_="job-search-card__salary-info")
compensation = None
if salary_tag:
@@ -212,7 +228,9 @@ class LinkedInScraper(Scraper):
"""
try:
session = create_session(is_tls=False, has_retry=True)
response = session.get(job_page_url, headers=self.headers, timeout=5, proxies=self.proxy)
response = session.get(
job_page_url, headers=self.headers, timeout=5, proxies=self.proxy
)
response.raise_for_status()
except:
return None, None
@@ -225,10 +243,12 @@ class LinkedInScraper(Scraper):
)
description = None
if div_content is not None:
def remove_attributes(tag):
for attr in list(tag.attrs):
del tag[attr]
return tag
div_content = remove_attributes(div_content)
description = div_content.prettify(formatter="html")
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN:
@@ -257,11 +277,8 @@ class LinkedInScraper(Scraper):
)
elif len(parts) == 3:
city, state, country = parts
location = Location(
city=city,
state=state,
country=Country.from_string(country)
)
country = Country.from_string(country)
location = Location(city=city, state=state, country=country)
return location
@staticmethod

View File

@@ -1,9 +1,10 @@
import logging
import re
from __future__ import annotations
import numpy as np
import re
import logging
import requests
import tls_client
import numpy as np
from markdownify import markdownify as md
from requests.adapters import HTTPAdapter, Retry
@@ -14,7 +15,8 @@ logger.propagate = False
if not logger.handlers:
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
formatter = logging.Formatter(format)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
@@ -33,7 +35,12 @@ def extract_emails_from_text(text: str) -> list[str] | None:
return email_regex.findall(text)
def create_session(proxy: dict | None = None, is_tls: bool = True, has_retry: bool = False, delay: int = 1) -> requests.Session:
def create_session(
proxy: dict | None = None,
is_tls: bool = True,
has_retry: bool = False,
delay: int = 1,
) -> requests.Session:
"""
Creates a requests session with optional tls, proxy, and retry settings.
:return: A session object
@@ -47,15 +54,17 @@ def create_session(proxy: dict | None = None, is_tls: bool = True, has_retry: bo
if proxy:
session.proxies.update(proxy)
if has_retry:
retries = Retry(total=3,
connect=3,
status=3,
status_forcelist=[500, 502, 503, 504, 429],
backoff_factor=delay)
retries = Retry(
total=3,
connect=3,
status=3,
status_forcelist=[500, 502, 503, 504, 429],
backoff_factor=delay,
)
adapter = HTTPAdapter(max_retries=retries)
session.mount('http://', adapter)
session.mount('https://', adapter)
session.mount("http://", adapter)
session.mount("https://", adapter)
return session
@@ -73,17 +82,15 @@ def get_enum_from_job_type(job_type_str: str) -> JobType | None:
def currency_parser(cur_str):
# Remove any non-numerical characters
# except for ',' '.' or '-' (e.g. EUR)
cur_str = re.sub("[^-0-9.,]", '', cur_str)
cur_str = re.sub("[^-0-9.,]", "", cur_str)
# Remove any 000s separators (either , or .)
cur_str = re.sub("[.,]", '', cur_str[:-3]) + cur_str[-3:]
cur_str = re.sub("[.,]", "", cur_str[:-3]) + cur_str[-3:]
if '.' in list(cur_str[-3:]):
if "." in list(cur_str[-3:]):
num = float(cur_str)
elif ',' in list(cur_str[-3:]):
num = float(cur_str.replace(',', '.'))
elif "," in list(cur_str[-3:]):
num = float(cur_str.replace(",", "."))
else:
num = float(cur_str)
return np.round(num, 2)

View File

@@ -4,6 +4,9 @@ jobspy.scrapers.ziprecruiter
This module contains routines to scrape ZipRecruiter.
"""
from __future__ import annotations
import math
import time
from datetime import datetime
@@ -16,7 +19,7 @@ from ..utils import (
logger,
extract_emails_from_text,
create_session,
markdown_converter
markdown_converter,
)
from ...jobs import (
JobPost,
@@ -25,7 +28,7 @@ from ...jobs import (
JobResponse,
JobType,
Country,
DescriptionFormat
DescriptionFormat,
)
@@ -62,7 +65,7 @@ class ZipRecruiterScraper(Scraper):
break
if page > 1:
time.sleep(self.delay)
logger.info(f'ZipRecruiter search page: {page}')
logger.info(f"ZipRecruiter search page: {page}")
jobs_on_page, continue_token = self._find_jobs_in_page(
scraper_input, continue_token
)
@@ -88,25 +91,24 @@ class ZipRecruiterScraper(Scraper):
if continue_token:
params["continue_from"] = continue_token
try:
res= self.session.get(
f"{self.api_url}/jobs-app/jobs",
headers=self.headers,
params=params
res = self.session.get(
f"{self.api_url}/jobs-app/jobs", headers=self.headers, params=params
)
if res.status_code not in range(200, 400):
if res.status_code == 429:
logger.error(f'429 Response - Blocked by ZipRecruiter for too many requests')
err = "429 Response - Blocked by ZipRecruiter for too many requests"
else:
logger.error(f'ZipRecruiter response status code {res.status_code}')
err = f"ZipRecruiter response status code {res.status_code}"
err += f" with response: {res.text}" # ZipRecruiter likely not available in EU
logger.error(err)
return jobs_list, ""
except Exception as e:
if "Proxy responded with" in str(e):
logger.error(f'Indeed: Bad proxy')
logger.error(f"Indeed: Bad proxy")
else:
logger.error(f'Indeed: {str(e)}')
logger.error(f"Indeed: {str(e)}")
return jobs_list, ""
res_data = res.json()
jobs_list = res_data.get("jobs", [])
next_continue_token = res_data.get("continue", None)
@@ -127,7 +129,11 @@ class ZipRecruiterScraper(Scraper):
self.seen_urls.add(job_url)
description = job.get("job_description", "").strip()
description = markdown_converter(description) if self.scraper_input.description_format == DescriptionFormat.MARKDOWN else description
description = (
markdown_converter(description)
if self.scraper_input.description_format == DescriptionFormat.MARKDOWN
else description
)
company = job.get("hiring_company", {}).get("name")
country_value = "usa" if job.get("job_country") == "US" else "canada"
country_enum = Country.from_string(country_value)
@@ -138,23 +144,22 @@ class ZipRecruiterScraper(Scraper):
job_type = self._get_job_type_enum(
job.get("employment_type", "").replace("_", "").lower()
)
date_posted = datetime.fromisoformat(job['posted_time'].rstrip("Z")).date()
date_posted = datetime.fromisoformat(job["posted_time"].rstrip("Z")).date()
comp_interval = job.get("compensation_interval")
comp_interval = "yearly" if comp_interval == "annual" else comp_interval
comp_min = int(job["compensation_min"]) if "compensation_min" in job else None
comp_max = int(job["compensation_max"]) if "compensation_max" in job else None
comp_currency = job.get("compensation_currency")
return JobPost(
title=title,
company_name=company,
location=location,
job_type=job_type,
compensation=Compensation(
interval="yearly"
if job.get("compensation_interval") == "annual"
else job.get("compensation_interval"),
min_amount=int(job["compensation_min"])
if "compensation_min" in job
else None,
max_amount=int(job["compensation_max"])
if "compensation_max" in job
else None,
currency=job.get("compensation_currency"),
interval=comp_interval,
min_amount=comp_min,
max_amount=comp_max,
currency=comp_currency,
),
date_posted=date_posted,
job_url=job_url,
@@ -163,8 +168,9 @@ class ZipRecruiterScraper(Scraper):
)
def _get_cookies(self):
data="event_type=session&logged_in=false&number_of_retry=1&property=model%3AiPhone&property=os%3AiOS&property=locale%3Aen_us&property=app_build_number%3A4734&property=app_version%3A91.0&property=manufacturer%3AApple&property=timestamp%3A2024-01-12T12%3A04%3A42-06%3A00&property=screen_height%3A852&property=os_version%3A16.6.1&property=source%3Ainstall&property=screen_width%3A393&property=device_model%3AiPhone%2014%20Pro&property=brand%3AApple"
self.session.post(f"{self.api_url}/jobs-app/event", data=data, headers=self.headers)
data = "event_type=session&logged_in=false&number_of_retry=1&property=model%3AiPhone&property=os%3AiOS&property=locale%3Aen_us&property=app_build_number%3A4734&property=app_version%3A91.0&property=manufacturer%3AApple&property=timestamp%3A2024-01-12T12%3A04%3A42-06%3A00&property=screen_height%3A852&property=os_version%3A16.6.1&property=source%3Ainstall&property=screen_width%3A393&property=device_model%3AiPhone%2014%20Pro&property=brand%3AApple"
url = f"{self.api_url}/jobs-app/event"
self.session.post(url, data=data, headers=self.headers)
@staticmethod
def _get_job_type_enum(job_type_str: str) -> list[JobType] | None:
@@ -180,16 +186,13 @@ class ZipRecruiterScraper(Scraper):
"location": scraper_input.location,
}
if scraper_input.hours_old:
fromage = max(scraper_input.hours_old // 24, 1) if scraper_input.hours_old else None
params['days'] = fromage
job_type_map = {
JobType.FULL_TIME: 'full_time',
JobType.PART_TIME: 'part_time'
}
params["days"] = max(scraper_input.hours_old // 24, 1)
job_type_map = {JobType.FULL_TIME: "full_time", JobType.PART_TIME: "part_time"}
if scraper_input.job_type:
params['employment_type'] = job_type_map[scraper_input.job_type] if scraper_input.job_type in job_type_map else scraper_input.job_type.value[0]
job_type = scraper_input.job_type
params["employment_type"] = job_type_map.get(job_type, job_type.value[0])
if scraper_input.easy_apply:
params['zipapply'] = 1
params["zipapply"] = 1
if scraper_input.is_remote:
params["remote"] = 1
if scraper_input.distance: