some fixes and logs

pull/231/head
Yariv Menachem 2024-12-23 18:01:07 +02:00
parent b55287b5ec
commit 9a4246901e
4 changed files with 82 additions and 55 deletions

View File

@ -1,4 +1,5 @@
from __future__ import annotations from __future__ import annotations
from threading import Lock
import pandas as pd import pandas as pd
from typing import Tuple from typing import Tuple
@ -118,23 +119,34 @@ def scrape_jobs(
site_to_jobs_dict = {} site_to_jobs_dict = {}
merged_jobs: list[JobPost] = [] merged_jobs: list[JobPost] = []
lock = Lock()
def worker(site): def worker(site):
site_val, scraped_info = scrape_site(site) logger = create_logger(f"Worker {site}")
# Add the scraped jobs to the merged list logger.info("Starting")
# Assuming scraped_info has 'jobs' as a list try:
merged_jobs.extend(scraped_info.jobs) site_val, scraped_info = scrape_site(site)
with lock:
merged_jobs.extend(scraped_info.jobs)
logger.info("Finished")
return site_val, scraped_info
except Exception as e:
logger.error(f"Error: {e}")
return None, None
return site_val, scraped_info with ThreadPoolExecutor(max_workers=5) as executor:
logger = create_logger("ThreadPoolExecutor")
with ThreadPoolExecutor() as executor:
future_to_site = { future_to_site = {
executor.submit(worker, site): site for site in scraper_input.site_type executor.submit(worker, site): site for site in scraper_input.site_type
} }
# An iterator over the given futures that yields each as it completes.
for future in as_completed(future_to_site): for future in as_completed(future_to_site):
site_value, scraped_data = future.result() try:
site_to_jobs_dict[site_value] = scraped_data site_value, scraped_data = future.result()
if site_value and scraped_data:
site_to_jobs_dict[site_value] = scraped_data
except Exception as e:
logger.error(f"Future Error occurred: {e}")
return merged_jobs return merged_jobs

View File

@ -7,8 +7,8 @@ from jobspy.scrapers.utils import create_logger
from jobspy.telegram_bot import TelegramBot from jobspy.telegram_bot import TelegramBot
logger = create_logger("Main") logger = create_logger("Main")
filter_by_title: list[str] = ["test", "qa", "Lead", "Full-Stack", "Full Stack", "Fullstack", "Frontend", "Front-end", "Front End", "DevOps", "Physical", "Staff" filter_by_title: list[str] = ["test", "qa", "Lead", "Full-Stack", "Full Stack", "Fullstack", "Frontend", "Front-end", "Front End", "DevOps", "Physical", "Staff",
"data", "automation", "BI", "Principal", "Architect", "Android", "Machine Learning", "Student"] "automation", "BI", "Principal", "Architect", "Android", "Machine Learning", "Student", "Data Engineer"]
def filter_jobs_by_title_name(job: JobPost): def filter_jobs_by_title_name(job: JobPost):
@ -25,25 +25,23 @@ async def main():
telegramBot = TelegramBot() telegramBot = TelegramBot()
jobRepository = JobRepository() jobRepository = JobRepository()
# sites_to_scrap = [Site.LINKEDIN, Site.GLASSDOOR, Site.INDEED, Site.GOOZALI] # sites_to_scrap = [Site.LINKEDIN, Site.GLASSDOOR, Site.INDEED, Site.GOOZALI]
sites_to_scrap = [Site.GOOZALI] sites_to_scrap = [Site.GLASSDOOR]
for site in sites_to_scrap: # sites_to_scrap = [Site.GOOZALI]
jobs = scrape_jobs( jobs = scrape_jobs(
site_name=[site], site_name=sites_to_scrap,
search_term="software engineer", search_term="software engineer",
google_search_term="software engineer jobs near Tel Aviv Israel since yesterday", google_search_term="software engineer jobs near Tel Aviv Israel since yesterday",
locations=["Tel Aviv, Israel", "Ramat Gan, Israel", locations=["Tel Aviv, Israel", "Ramat Gan, Israel",
"Central, Israel", "Rehovot ,Israel"], "Central, Israel", "Rehovot ,Israel"],
results_wanted=200, results_wanted=200,
hours_old=200, hours_old=48,
country_indeed='israel' country_indeed='israel'
) )
logger.info(f"Found {len(jobs)} jobs") logger.info(f"Found {len(jobs)} jobs")
jobs = list(filter(filter_jobs_by_title_name, jobs)) jobs = list(filter(filter_jobs_by_title_name, jobs))
newJobs = jobRepository.insertManyIfNotFound(jobs)
newJobs = jobRepository.insertManyIfNotFound(jobs) for newJob in newJobs:
await telegramBot.sendJob(newJob)
for newJob in newJobs:
await telegramBot.sendJob(newJob)
# Run the async main function # Run the async main function
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -37,6 +37,7 @@ from ...jobs import (
logger = create_logger("Glassdoor") logger = create_logger("Glassdoor")
class GlassdoorScraper(Scraper): class GlassdoorScraper(Scraper):
def __init__( def __init__(
self, proxies: list[str] | str | None = None, ca_cert: str | None = None self, proxies: list[str] | str | None = None, ca_cert: str | None = None
@ -62,7 +63,8 @@ class GlassdoorScraper(Scraper):
:return: JobResponse containing a list of jobs. :return: JobResponse containing a list of jobs.
""" """
self.scraper_input = scraper_input self.scraper_input = scraper_input
self.scraper_input.results_wanted = min(900, scraper_input.results_wanted) self.scraper_input.results_wanted = min(
900, scraper_input.results_wanted)
self.base_url = self.scraper_input.country.get_glassdoor_url() self.base_url = self.scraper_input.country.get_glassdoor_url()
self.session = create_session( self.session = create_session(
@ -71,16 +73,17 @@ class GlassdoorScraper(Scraper):
token = self._get_csrf_token() token = self._get_csrf_token()
headers["gd-csrf-token"] = token if token else fallback_token headers["gd-csrf-token"] = token if token else fallback_token
self.session.headers.update(headers) self.session.headers.update(headers)
job_list: list[JobPost] = []; job_list: list[JobPost] = []
for location in scraper_input.locations: for location in scraper_input.locations:
glassDoorLocatiions: List[GlassDoorLocationResponse] = self._get_locations( glassDoorLocatiions: List[GlassDoorLocationResponse] = self._get_locations(
location, scraper_input.is_remote location, scraper_input.is_remote
) )
for glassDoorLocatiion in glassDoorLocatiions: for glassDoorLocatiion in glassDoorLocatiions:
logger.info(f"Location: {glassDoorLocatiion.longName}") logger.info(f"Location: {glassDoorLocatiion.longName}")
locationType = get_location_type(glassDoorLocatiion); locationType = get_location_type(glassDoorLocatiion)
locationId = get_location_id(glassDoorLocatiion); locationId = get_location_id(glassDoorLocatiion)
jobs_temp = self.get_jobs(scraper_input,locationId,locationType); jobs_temp = self.get_jobs(
scraper_input, locationId, locationType)
if (jobs_temp is not None and len(jobs_temp) > 1): if (jobs_temp is not None and len(jobs_temp) > 1):
job_list.extend(jobs_temp) job_list.extend(jobs_temp)
return JobResponse(jobs=job_list) return JobResponse(jobs=job_list)
@ -99,7 +102,8 @@ class GlassdoorScraper(Scraper):
jobs = [] jobs = []
self.scraper_input = scraper_input self.scraper_input = scraper_input
try: try:
payload = self._add_payload(location_id, location_type, page_num, cursor) payload = self._add_payload(
location_id, location_type, page_num, cursor)
response = self.session.post( response = self.session.post(
f"{self.base_url}/graph", f"{self.base_url}/graph",
timeout_seconds=15, timeout_seconds=15,
@ -132,7 +136,8 @@ class GlassdoorScraper(Scraper):
if job_post: if job_post:
jobs.append(job_post) jobs.append(job_post)
except Exception as exc: except Exception as exc:
raise GlassdoorException(f"Glassdoor generated an exception: {exc}") raise GlassdoorException(
f"Glassdoor generated an exception: {exc}")
return jobs, self.get_cursor_for_page( return jobs, self.get_cursor_for_page(
res_json["data"]["jobListings"]["paginationCursors"], page_num + 1 res_json["data"]["jobListings"]["paginationCursors"], page_num + 1
@ -150,7 +155,8 @@ class GlassdoorScraper(Scraper):
cursor = None cursor = None
range_start = 1 + (scraper_input.offset // self.jobs_per_page) range_start = 1 + (scraper_input.offset // self.jobs_per_page)
tot_pages = (scraper_input.results_wanted // self.jobs_per_page) + 2 tot_pages = (scraper_input.results_wanted //
self.jobs_per_page) + 2
range_end = min(tot_pages, self.max_pages + 1) range_end = min(tot_pages, self.max_pages + 1)
for page in range(range_start, range_end): for page in range(range_start, range_end):
logger.info(f"search page: {page} / {range_end-1}") logger.info(f"search page: {page} / {range_end-1}")
@ -174,7 +180,8 @@ class GlassdoorScraper(Scraper):
""" """
Fetches csrf token needed for API by visiting a generic page Fetches csrf token needed for API by visiting a generic page
""" """
res = self.session.get(f"{self.base_url}/Job/computer-science-jobs.htm") res = self.session.get(
f"{self.base_url}/Job/computer-science-jobs.htm")
pattern = r'"token":\s*"([^"]+)"' pattern = r'"token":\s*"([^"]+)"'
matches = re.findall(pattern, res.text) matches = re.findall(pattern, res.text)
token = None token = None
@ -234,7 +241,8 @@ class GlassdoorScraper(Scraper):
compensation=compensation, compensation=compensation,
is_remote=is_remote, is_remote=is_remote,
description=description, description=description,
emails=extract_emails_from_text(description) if description else None, emails=extract_emails_from_text(
description) if description else None,
company_logo=company_logo, company_logo=company_logo,
listing_type=listing_type, listing_type=listing_type,
) )
@ -280,7 +288,8 @@ class GlassdoorScraper(Scraper):
def _get_location(self, location: str, is_remote: bool) -> (int, str): def _get_location(self, location: str, is_remote: bool) -> (int, str):
if not location or is_remote: if not location or is_remote:
return "11047", "STATE" # remote options return "11047", "STATE" # remote options
url = f"{self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}" url = f"{
self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
res = self.session.get(url) res = self.session.get(url)
if res.status_code != 200: if res.status_code != 200:
if res.status_code == 429: if res.status_code == 429:
@ -290,7 +299,8 @@ class GlassdoorScraper(Scraper):
else: else:
err = f"Glassdoor response status code {res.status_code}" err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}" err += f" - {res.text}"
logger.error(f"Glassdoor response status code {res.status_code}") logger.error(f"Glassdoor response status code {
res.status_code}")
return None, None return None, None
items = res.json() items = res.json()
@ -308,13 +318,15 @@ class GlassdoorScraper(Scraper):
return int(items[0]["locationId"]), location_type return int(items[0]["locationId"]), location_type
# Example string 'Tel Aviv, Israel' # Example string 'Tel Aviv, Israel'
def get_city_from_location(self, location:str) -> str: def get_city_from_location(self, location: str) -> str:
return location.split(',')[0].strip() # Replace space with %2 to get "Tel%2Aviv" # Replace space with %2 to get "Tel%2Aviv"
return location.split(',')[0].strip()
def _get_locations(self, location: str, is_remote: bool) -> List[GlassDoorLocationResponse]: def _get_locations(self, location: str, is_remote: bool) -> List[GlassDoorLocationResponse]:
if not location or is_remote: if not location or is_remote:
return "11047", "STATE" # remote options return "11047", "STATE" # remote options
url = f"{self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}" url = f"{
self.base_url}/findPopularLocationAjax.htm?maxLocationsToReturn=10&term={location}"
res = self.session.get(url) res = self.session.get(url)
if res.status_code != 200: if res.status_code != 200:
if res.status_code == 429: if res.status_code == 429:
@ -324,7 +336,8 @@ class GlassdoorScraper(Scraper):
else: else:
err = f"Glassdoor response status code {res.status_code}" err = f"Glassdoor response status code {res.status_code}"
err += f" - {res.text}" err += f" - {res.text}"
logger.error(f"Glassdoor response status code {res.status_code}") logger.error(f"Glassdoor response status code {
res.status_code}")
return None, None return None, None
formatted_city = self.get_city_from_location(location) formatted_city = self.get_city_from_location(location)
items: List[GlassDoorLocationResponse] = [ items: List[GlassDoorLocationResponse] = [
@ -337,7 +350,7 @@ class GlassdoorScraper(Scraper):
logger.error(f"location not found in Glassdoor: {location}") logger.error(f"location not found in Glassdoor: {location}")
# raise ValueError(f"Location '{location}' not found on Glassdoor") # raise ValueError(f"Location '{location}' not found on Glassdoor")
return items; return items
def _add_payload( def _add_payload(
self, self,
@ -351,9 +364,11 @@ class GlassdoorScraper(Scraper):
fromage = max(self.scraper_input.hours_old // 24, 1) fromage = max(self.scraper_input.hours_old // 24, 1)
filter_params = [] filter_params = []
if self.scraper_input.easy_apply: if self.scraper_input.easy_apply:
filter_params.append({"filterKey": "applicationType", "values": "1"}) filter_params.append(
{"filterKey": "applicationType", "values": "1"})
if fromage: if fromage:
filter_params.append({"filterKey": "fromAge", "values": str(fromage)}) filter_params.append(
{"filterKey": "fromAge", "values": str(fromage)})
payload = { payload = {
"operationName": "JobSearchResultsQuery", "operationName": "JobSearchResultsQuery",
"variables": { "variables": {
@ -373,7 +388,8 @@ class GlassdoorScraper(Scraper):
} }
if self.scraper_input.job_type: if self.scraper_input.job_type:
payload["variables"]["filterParams"].append( payload["variables"]["filterParams"].append(
{"filterKey": "jobType", "values": self.scraper_input.job_type.value[0]} {"filterKey": "jobType",
"values": self.scraper_input.job_type.value[0]}
) )
return json.dumps([payload]) return json.dumps([payload])

View File

@ -71,6 +71,7 @@ class GoozaliScraper(Scraper):
return JobResponse(jobs=job_list) return JobResponse(jobs=job_list)
except Exception as e: except Exception as e:
logger.error(f"Exception: {str(e)}") logger.error(f"Exception: {str(e)}")
return JobResponse(jobs=job_list)
# model the response with models # model the response with models
goozali_response = self.mapper.map_response_to_goozali_response( goozali_response = self.mapper.map_response_to_goozali_response(
response=response) response=response)