diff --git a/src/db/job_repository.py b/src/db/job_repository.py index 9071331..0f7fc86 100644 --- a/src/db/job_repository.py +++ b/src/db/job_repository.py @@ -1,5 +1,3 @@ -from typing import List - from dotenv import load_dotenv from pymongo import UpdateOne @@ -31,13 +29,14 @@ class JobRepository: self.collection.insert_one(job_dict) self.logger.info(f"Inserted new job with title {job.title}.") - def insertManyIfNotFound(self, jobs: List[JobPost]) -> List[JobPost]: + def insert_many_if_not_found(self, jobs: list[JobPost]) -> tuple[list[JobPost],list[JobPost]]: """ Perform bulk upserts for a list of JobPost objects into a MongoDB collection. Only insert new jobs and return the list of newly inserted jobs. """ operations = [] new_jobs = [] # List to store the new jobs inserted into MongoDB + old_jobs = [] # List to store the new jobs inserted into MongoDB for job in jobs: job_dict = job.model_dump(exclude={"date_posted"}) operations.append( @@ -60,6 +59,7 @@ class JobRepository: for i, job in enumerate(jobs): if result.upserted_count > 0 and i < result.upserted_count: new_jobs.append(job) - self.logger.info(f"New Job ID: {job.id}, Label: {job.title}") + else: + old_jobs.append(job) - return new_jobs + return old_jobs ,new_jobs diff --git a/src/jobspy/__init__.py b/src/jobspy/__init__.py index 22ebf31..60980db 100644 --- a/src/jobspy/__init__.py +++ b/src/jobspy/__init__.py @@ -29,30 +29,30 @@ from .scrapers.exceptions import ( def scrape_jobs( - site_name: str | list[str] | Site | list[Site] | None = None, - search_term: str | None = None, - google_search_term: str | None = None, - location: str | None = None, - locations: list[str] | None = None, - distance: int | None = 50, - is_remote: bool = False, - job_type: str | None = None, - easy_apply: bool | None = None, - results_wanted: int = 15, - country_indeed: str = "usa", - hyperlinks: bool = False, - proxies: list[str] | str | None = None, - ca_cert: str | None = None, - description_format: str = "markdown", - linkedin_fetch_description: bool | None = False, - linkedin_company_ids: list[int] | None = None, - offset: int | None = 0, - hours_old: int = None, - enforce_annual_salary: bool = False, - verbose: int = 2, - filter_by_title:list[str] = None, - ** kwargs, -) -> list[JobPost]: + site_name: str | list[str] | Site | list[Site] | None = None, + search_term: str | None = None, + google_search_term: str | None = None, + location: str | None = None, + locations: list[str] | None = None, + distance: int | None = 50, + is_remote: bool = False, + job_type: str | None = None, + easy_apply: bool | None = None, + results_wanted: int = 15, + country_indeed: str = "usa", + hyperlinks: bool = False, + proxies: list[str] | str | None = None, + ca_cert: str | None = None, + description_format: str = "markdown", + linkedin_fetch_description: bool | None = False, + linkedin_company_ids: list[int] | None = None, + offset: int | None = 0, + hours_old: int = None, + enforce_annual_salary: bool = False, + verbose: int = 2, + filter_by_title: list[str] = None, + **kwargs, +) -> (list[JobPost], list[JobPost]): """ Simultaneously scrapes job data from multiple job sites. :return: pandas dataframe containing job data @@ -151,13 +151,29 @@ def scrape_jobs( except Exception as e: logger.error(f"Future Error occurred: {e}") - def filter_jobs_by_title_name(job: JobPost): - for filter_title in filter_by_title: - if re.search(filter_title, job.title, re.IGNORECASE): - logger.info(f"job filtered out by title: {job.id} , { - job.title} , found {filter_title}") - return False + def filter_jobs_by_title_name(jobs: list[JobPost], filter_by_title: list[str]) -> tuple[list, list]: + """ + Filters jobs based on title names and returns two lists: filtered and remaining jobs. - return True + Args: + jobs: A list of JobPost objects. + filter_by_title: A list of strings representing titles to filter out. - return list(filter(filter_jobs_by_title_name, merged_jobs)) + Returns: + A tuple containing two lists: + - The first list contains JobPost objects that were filtered out. + - The second list contains JobPost objects that remain after filtering. + """ + filtered_jobs = [] + remaining_jobs = [] + for job in jobs: + for filter_title in filter_by_title: + if re.search(filter_title, job.title, re.IGNORECASE): + logger.info(f"job filtered out by title: {job.id} , {job.title} , found {filter_title}") + filtered_jobs.append(job) + break # Exit inner loop once a match is found for the job + else: + remaining_jobs.append(job) + return filtered_jobs, remaining_jobs + + return filter_jobs_by_title_name(merged_jobs, filter_by_title) diff --git a/src/telegram_handler/telegram_default_handler.py b/src/telegram_handler/telegram_default_handler.py index 60a21cc..887da92 100644 --- a/src/telegram_handler/telegram_default_handler.py +++ b/src/telegram_handler/telegram_default_handler.py @@ -5,7 +5,7 @@ from telegram.ext import ( ) from db.job_repository import JobRepository -from jobspy import Site, scrape_jobs +from jobspy import Site, scrape_jobs, JobPost from jobspy.scrapers.utils import create_logger from telegram_bot import TelegramBot from telegram_handler.telegram_handler import TelegramHandler @@ -25,14 +25,19 @@ class TelegramDefaultHandler(TelegramHandler): else: self.logger = create_logger("TelegramAllHandler") + async def send_old_job(self, old_jobs: list[JobPost]): + + pass + async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE): self.logger.info("start handling") await self.telegram_bot.set_message_reaction( update.message.message_id, ReactionEmoji.FIRE) site_names = [site.name for site in self.sites_to_scrap] + site_names_print = ", ".join(site_names) await self.telegram_bot.send_text( - f"Start scarping: {", ".join(site_names)}") - jobs = scrape_jobs( + f"Start scarping: {site_names_print}") + filtered_out_jobs, jobs = scrape_jobs( site_name=self.sites_to_scrap, search_term=self.search_term, locations=self.locations, @@ -41,9 +46,14 @@ class TelegramDefaultHandler(TelegramHandler): filter_by_title=self.title_filters ) self.logger.info(f"Found {len(jobs)} jobs") - new_jobs = self.jobRepository.insertManyIfNotFound(jobs) + self.jobRepository.insert_many_if_not_found(filtered_out_jobs) + old_jobs, new_jobs = self.jobRepository.insert_many_if_not_found(jobs) for newJob in new_jobs: await self.telegram_bot.send_job(newJob) + filtered_by_title = [job.title for job in filtered_out_jobs] + result_string = "filtered by title:\n" + "\n".join(filtered_by_title) + await self.telegram_bot.send_text(result_string) + self.logger.info(f"Found {len(old_jobs)} old jobs") await self.telegram_bot.send_text( - f"Finished scarping: {self.sites_to_scrap[0].name}") + f"Finished scarping: {site_names_print}") self.logger.info("finished handling") diff --git a/src/telegram_handler/telegram_indeed_handler.py b/src/telegram_handler/telegram_indeed_handler.py index daee135..79ad52a 100644 --- a/src/telegram_handler/telegram_indeed_handler.py +++ b/src/telegram_handler/telegram_indeed_handler.py @@ -23,7 +23,7 @@ class TelegramIndeedHandler(TelegramHandler): async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE): self.logger.info("start handling") - jobs = scrape_jobs( + filtered_out_jobs, jobs = scrape_jobs( site_name=self.sites_to_scrap, search_term=self.search_term, locations=self.locations, @@ -33,7 +33,7 @@ class TelegramIndeedHandler(TelegramHandler): filter_by_title=self.title_filters ) self.logger.info(f"Found {len(jobs)} jobs") - new_jobs = self.jobRepository.insertManyIfNotFound(jobs) + new_jobs = self.jobRepository.insert_many_if_not_found(jobs) for newJob in new_jobs: await self.telegramBot.send_job(newJob) self.logger.info("finished handling")