JobSpy/jobspy/util.py

348 lines
10 KiB
Python
Raw Normal View History

from __future__ import annotations
2023-10-29 06:54:56 -07:00
import logging
2025-02-21 10:29:28 -08:00
import re
from itertools import cycle
2025-02-21 10:29:28 -08:00
import numpy as np
import requests
2024-03-04 15:35:57 -08:00
import tls_client
2025-02-21 10:29:28 -08:00
import urllib3
2024-03-04 15:35:57 -08:00
from markdownify import markdownify as md
from requests.adapters import HTTPAdapter, Retry
2025-02-21 12:14:55 -08:00
from jobspy.model import CompensationInterval, JobType, Site
2023-10-10 09:29:29 -07:00
2025-02-21 10:29:28 -08:00
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
2024-10-19 16:01:59 -07:00
def create_logger(name: str):
logger = logging.getLogger(f"JobSpy:{name}")
logger.propagate = False
if not logger.handlers:
logger.setLevel(logging.INFO)
console_handler = logging.StreamHandler()
format = "%(asctime)s - %(levelname)s - %(name)s - %(message)s"
formatter = logging.Formatter(format)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
return logger
2024-01-28 19:50:41 -08:00
class RotatingProxySession:
def __init__(self, proxies=None):
if isinstance(proxies, str):
self.proxy_cycle = cycle([self.format_proxy(proxies)])
elif isinstance(proxies, list):
self.proxy_cycle = (
cycle([self.format_proxy(proxy) for proxy in proxies])
if proxies
else None
)
else:
self.proxy_cycle = None
@staticmethod
def format_proxy(proxy):
"""Utility method to format a proxy string into a dictionary."""
if proxy.startswith("http://") or proxy.startswith("https://"):
return {"http": proxy, "https": proxy}
return {"http": f"http://{proxy}", "https": f"http://{proxy}"}
class RequestsRotating(RotatingProxySession, requests.Session):
def __init__(self, proxies=None, has_retry=False, delay=1, clear_cookies=False):
RotatingProxySession.__init__(self, proxies=proxies)
requests.Session.__init__(self)
self.clear_cookies = clear_cookies
self.allow_redirects = True
self.setup_session(has_retry, delay)
def setup_session(self, has_retry, delay):
if has_retry:
retries = Retry(
total=3,
connect=3,
status=3,
status_forcelist=[500, 502, 503, 504, 429],
backoff_factor=delay,
)
adapter = HTTPAdapter(max_retries=retries)
self.mount("http://", adapter)
self.mount("https://", adapter)
def request(self, method, url, **kwargs):
if self.clear_cookies:
self.cookies.clear()
if self.proxy_cycle:
next_proxy = next(self.proxy_cycle)
if next_proxy["http"] != "http://localhost":
self.proxies = next_proxy
else:
self.proxies = {}
return requests.Session.request(self, method, url, **kwargs)
class TLSRotating(RotatingProxySession, tls_client.Session):
def __init__(self, proxies=None):
RotatingProxySession.__init__(self, proxies=proxies)
tls_client.Session.__init__(self, random_tls_extension_order=True)
def execute_request(self, *args, **kwargs):
if self.proxy_cycle:
next_proxy = next(self.proxy_cycle)
if next_proxy["http"] != "http://localhost":
self.proxies = next_proxy
else:
self.proxies = {}
response = tls_client.Session.execute_request(self, *args, **kwargs)
response.ok = response.status_code in range(200, 400)
return response
def create_session(
*,
proxies: dict | str | None = None,
ca_cert: str | None = None,
is_tls: bool = True,
has_retry: bool = False,
delay: int = 1,
clear_cookies: bool = False,
) -> requests.Session:
"""
Creates a requests session with optional tls, proxy, and retry settings.
:return: A session object
"""
if is_tls:
session = TLSRotating(proxies=proxies)
else:
session = RequestsRotating(
proxies=proxies,
has_retry=has_retry,
delay=delay,
clear_cookies=clear_cookies,
)
if ca_cert:
session.verify = ca_cert
return session
2025-02-21 10:29:28 -08:00
def set_logger_level(verbose: int):
"""
Adjusts the logger's level. This function allows the logging level to be changed at runtime.
Parameters:
- verbose: int {0, 1, 2} (default=2, all logs)
"""
if verbose is None:
return
level_name = {2: "INFO", 1: "WARNING", 0: "ERROR"}.get(verbose, "INFO")
level = getattr(logging, level_name.upper(), None)
if level is not None:
2024-10-19 16:01:59 -07:00
for logger_name in logging.root.manager.loggerDict:
if logger_name.startswith("JobSpy:"):
logging.getLogger(logger_name).setLevel(level)
else:
raise ValueError(f"Invalid log level: {level_name}")
2024-02-14 14:04:23 -08:00
def markdown_converter(description_html: str):
if description_html is None:
2024-03-04 15:35:57 -08:00
return None
markdown = md(description_html)
return markdown.strip()
2024-02-14 14:04:23 -08:00
2023-10-10 09:29:29 -07:00
def extract_emails_from_text(text: str) -> list[str] | None:
if not text:
return None
email_regex = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
return email_regex.findall(text)
def get_enum_from_job_type(job_type_str: str) -> JobType | None:
"""
Given a string, returns the corresponding JobType enum member if a match is found.
"""
res = None
for job_type in JobType:
if job_type_str in job_type.value:
res = job_type
return res
2023-11-09 12:57:15 -08:00
2024-02-12 09:02:48 -08:00
2023-11-09 12:57:15 -08:00
def currency_parser(cur_str):
# Remove any non-numerical characters
# except for ',' '.' or '-' (e.g. EUR)
cur_str = re.sub("[^-0-9.,]", "", cur_str)
2023-11-09 12:57:15 -08:00
# Remove any 000s separators (either , or .)
cur_str = re.sub("[.,]", "", cur_str[:-3]) + cur_str[-3:]
2023-11-09 12:57:15 -08:00
if "." in list(cur_str[-3:]):
2023-11-09 12:57:15 -08:00
num = float(cur_str)
elif "," in list(cur_str[-3:]):
num = float(cur_str.replace(",", "."))
2023-11-09 12:57:15 -08:00
else:
num = float(cur_str)
return np.round(num, 2)
def remove_attributes(tag):
for attr in list(tag.attrs):
del tag[attr]
return tag
2024-06-09 15:45:38 -07:00
def extract_salary(
salary_str,
lower_limit=1000,
upper_limit=700000,
hourly_threshold=350,
monthly_threshold=30000,
enforce_annual_salary=False,
2024-06-09 15:45:38 -07:00
):
2024-10-19 16:01:59 -07:00
"""
Extracts salary information from a string and returns the salary interval, min and max salary values, and currency.
(TODO: Needs test cases as the regex is complicated and may not cover all edge cases)
"""
2024-06-09 15:45:38 -07:00
if not salary_str:
return None, None, None, None
2024-08-05 10:47:55 -07:00
annual_max_salary = None
2024-06-09 15:45:38 -07:00
min_max_pattern = r"\$(\d+(?:,\d+)?(?:\.\d+)?)([kK]?)\s*[-—–]\s*(?:\$)?(\d+(?:,\d+)?(?:\.\d+)?)([kK]?)"
def to_int(s):
return int(float(s.replace(",", "")))
def convert_hourly_to_annual(hourly_wage):
return hourly_wage * 2080
def convert_monthly_to_annual(monthly_wage):
return monthly_wage * 12
match = re.search(min_max_pattern, salary_str)
if match:
min_salary = to_int(match.group(1))
max_salary = to_int(match.group(3))
# Handle 'k' suffix for min and max salaries independently
if "k" in match.group(2).lower() or "k" in match.group(4).lower():
min_salary *= 1000
max_salary *= 1000
# Convert to annual if less than the hourly threshold
if min_salary < hourly_threshold:
interval = CompensationInterval.HOURLY.value
annual_min_salary = convert_hourly_to_annual(min_salary)
2024-06-09 15:45:38 -07:00
if max_salary < hourly_threshold:
annual_max_salary = convert_hourly_to_annual(max_salary)
2024-06-09 15:45:38 -07:00
elif min_salary < monthly_threshold:
interval = CompensationInterval.MONTHLY.value
annual_min_salary = convert_monthly_to_annual(min_salary)
2024-06-09 15:45:38 -07:00
if max_salary < monthly_threshold:
annual_max_salary = convert_monthly_to_annual(max_salary)
else:
interval = CompensationInterval.YEARLY.value
annual_min_salary = min_salary
annual_max_salary = max_salary
2024-06-09 15:45:38 -07:00
# Ensure salary range is within specified limits
2024-08-05 10:47:55 -07:00
if not annual_max_salary:
return None, None, None, None
2024-06-09 15:45:38 -07:00
if (
lower_limit <= annual_min_salary <= upper_limit
and lower_limit <= annual_max_salary <= upper_limit
and annual_min_salary < annual_max_salary
2024-06-09 15:45:38 -07:00
):
if enforce_annual_salary:
return interval, annual_min_salary, annual_max_salary, "USD"
else:
return interval, min_salary, max_salary, "USD"
2024-06-09 15:45:38 -07:00
return None, None, None, None
2024-10-24 13:19:40 -07:00
def extract_job_type(description: str):
if not description:
return []
keywords = {
JobType.FULL_TIME: r"full\s?time",
JobType.PART_TIME: r"part\s?time",
JobType.INTERNSHIP: r"internship",
JobType.CONTRACT: r"contract",
}
listing_types = []
for key, pattern in keywords.items():
if re.search(pattern, description, re.IGNORECASE):
listing_types.append(key)
return listing_types if listing_types else None
2025-02-21 12:14:55 -08:00
def map_str_to_site(site_name: str) -> Site:
return Site[site_name.upper()]
def get_enum_from_value(value_str):
for job_type in JobType:
if value_str in job_type.value:
return job_type
raise Exception(f"Invalid job type: {value_str}")
def convert_to_annual(job_data: dict):
if job_data["interval"] == "hourly":
job_data["min_amount"] *= 2080
job_data["max_amount"] *= 2080
if job_data["interval"] == "monthly":
job_data["min_amount"] *= 12
job_data["max_amount"] *= 12
if job_data["interval"] == "weekly":
job_data["min_amount"] *= 52
job_data["max_amount"] *= 52
if job_data["interval"] == "daily":
job_data["min_amount"] *= 260
job_data["max_amount"] *= 260
job_data["interval"] = "yearly"
desired_order = [
"id",
"site",
"job_url",
"job_url_direct",
"title",
"company",
"location",
"date_posted",
"job_type",
"salary_source",
"interval",
"min_amount",
"max_amount",
"currency",
"is_remote",
"job_level",
"job_function",
"listing_type",
"emails",
"description",
"company_industry",
"company_url",
"company_logo",
"company_url_direct",
"company_addresses",
"company_num_employees",
"company_revenue",
"company_description",
]