added some logs and messages to be more clearly

pull/231/head
Yariv Menachem 2024-12-31 14:55:58 +02:00
parent 3e9511943a
commit f0f6cf044f
4 changed files with 70 additions and 44 deletions

View File

@ -1,5 +1,3 @@
from typing import List
from dotenv import load_dotenv from dotenv import load_dotenv
from pymongo import UpdateOne from pymongo import UpdateOne
@ -31,13 +29,14 @@ class JobRepository:
self.collection.insert_one(job_dict) self.collection.insert_one(job_dict)
self.logger.info(f"Inserted new job with title {job.title}.") self.logger.info(f"Inserted new job with title {job.title}.")
def insertManyIfNotFound(self, jobs: List[JobPost]) -> List[JobPost]: def insert_many_if_not_found(self, jobs: list[JobPost]) -> tuple[list[JobPost],list[JobPost]]:
""" """
Perform bulk upserts for a list of JobPost objects into a MongoDB collection. Perform bulk upserts for a list of JobPost objects into a MongoDB collection.
Only insert new jobs and return the list of newly inserted jobs. Only insert new jobs and return the list of newly inserted jobs.
""" """
operations = [] operations = []
new_jobs = [] # List to store the new jobs inserted into MongoDB new_jobs = [] # List to store the new jobs inserted into MongoDB
old_jobs = [] # List to store the new jobs inserted into MongoDB
for job in jobs: for job in jobs:
job_dict = job.model_dump(exclude={"date_posted"}) job_dict = job.model_dump(exclude={"date_posted"})
operations.append( operations.append(
@ -60,6 +59,7 @@ class JobRepository:
for i, job in enumerate(jobs): for i, job in enumerate(jobs):
if result.upserted_count > 0 and i < result.upserted_count: if result.upserted_count > 0 and i < result.upserted_count:
new_jobs.append(job) new_jobs.append(job)
self.logger.info(f"New Job ID: {job.id}, Label: {job.title}") else:
old_jobs.append(job)
return new_jobs return old_jobs ,new_jobs

View File

@ -52,7 +52,7 @@ def scrape_jobs(
verbose: int = 2, verbose: int = 2,
filter_by_title: list[str] = None, filter_by_title: list[str] = None,
**kwargs, **kwargs,
) -> list[JobPost]: ) -> (list[JobPost], list[JobPost]):
""" """
Simultaneously scrapes job data from multiple job sites. Simultaneously scrapes job data from multiple job sites.
:return: pandas dataframe containing job data :return: pandas dataframe containing job data
@ -151,13 +151,29 @@ def scrape_jobs(
except Exception as e: except Exception as e:
logger.error(f"Future Error occurred: {e}") logger.error(f"Future Error occurred: {e}")
def filter_jobs_by_title_name(job: JobPost): def filter_jobs_by_title_name(jobs: list[JobPost], filter_by_title: list[str]) -> tuple[list, list]:
"""
Filters jobs based on title names and returns two lists: filtered and remaining jobs.
Args:
jobs: A list of JobPost objects.
filter_by_title: A list of strings representing titles to filter out.
Returns:
A tuple containing two lists:
- The first list contains JobPost objects that were filtered out.
- The second list contains JobPost objects that remain after filtering.
"""
filtered_jobs = []
remaining_jobs = []
for job in jobs:
for filter_title in filter_by_title: for filter_title in filter_by_title:
if re.search(filter_title, job.title, re.IGNORECASE): if re.search(filter_title, job.title, re.IGNORECASE):
logger.info(f"job filtered out by title: {job.id} , { logger.info(f"job filtered out by title: {job.id} , {job.title} , found {filter_title}")
job.title} , found {filter_title}") filtered_jobs.append(job)
return False break # Exit inner loop once a match is found for the job
else:
remaining_jobs.append(job)
return filtered_jobs, remaining_jobs
return True return filter_jobs_by_title_name(merged_jobs, filter_by_title)
return list(filter(filter_jobs_by_title_name, merged_jobs))

View File

@ -5,7 +5,7 @@ from telegram.ext import (
) )
from db.job_repository import JobRepository from db.job_repository import JobRepository
from jobspy import Site, scrape_jobs from jobspy import Site, scrape_jobs, JobPost
from jobspy.scrapers.utils import create_logger from jobspy.scrapers.utils import create_logger
from telegram_bot import TelegramBot from telegram_bot import TelegramBot
from telegram_handler.telegram_handler import TelegramHandler from telegram_handler.telegram_handler import TelegramHandler
@ -25,14 +25,19 @@ class TelegramDefaultHandler(TelegramHandler):
else: else:
self.logger = create_logger("TelegramAllHandler") self.logger = create_logger("TelegramAllHandler")
async def send_old_job(self, old_jobs: list[JobPost]):
pass
async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
self.logger.info("start handling") self.logger.info("start handling")
await self.telegram_bot.set_message_reaction( await self.telegram_bot.set_message_reaction(
update.message.message_id, ReactionEmoji.FIRE) update.message.message_id, ReactionEmoji.FIRE)
site_names = [site.name for site in self.sites_to_scrap] site_names = [site.name for site in self.sites_to_scrap]
site_names_print = ", ".join(site_names)
await self.telegram_bot.send_text( await self.telegram_bot.send_text(
f"Start scarping: {", ".join(site_names)}") f"Start scarping: {site_names_print}")
jobs = scrape_jobs( filtered_out_jobs, jobs = scrape_jobs(
site_name=self.sites_to_scrap, site_name=self.sites_to_scrap,
search_term=self.search_term, search_term=self.search_term,
locations=self.locations, locations=self.locations,
@ -41,9 +46,14 @@ class TelegramDefaultHandler(TelegramHandler):
filter_by_title=self.title_filters filter_by_title=self.title_filters
) )
self.logger.info(f"Found {len(jobs)} jobs") self.logger.info(f"Found {len(jobs)} jobs")
new_jobs = self.jobRepository.insertManyIfNotFound(jobs) self.jobRepository.insert_many_if_not_found(filtered_out_jobs)
old_jobs, new_jobs = self.jobRepository.insert_many_if_not_found(jobs)
for newJob in new_jobs: for newJob in new_jobs:
await self.telegram_bot.send_job(newJob) await self.telegram_bot.send_job(newJob)
filtered_by_title = [job.title for job in filtered_out_jobs]
result_string = "filtered by title:\n" + "\n".join(filtered_by_title)
await self.telegram_bot.send_text(result_string)
self.logger.info(f"Found {len(old_jobs)} old jobs")
await self.telegram_bot.send_text( await self.telegram_bot.send_text(
f"Finished scarping: {self.sites_to_scrap[0].name}") f"Finished scarping: {site_names_print}")
self.logger.info("finished handling") self.logger.info("finished handling")

View File

@ -23,7 +23,7 @@ class TelegramIndeedHandler(TelegramHandler):
async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE): async def handle(self, update: Update, context: ContextTypes.DEFAULT_TYPE):
self.logger.info("start handling") self.logger.info("start handling")
jobs = scrape_jobs( filtered_out_jobs, jobs = scrape_jobs(
site_name=self.sites_to_scrap, site_name=self.sites_to_scrap,
search_term=self.search_term, search_term=self.search_term,
locations=self.locations, locations=self.locations,
@ -33,7 +33,7 @@ class TelegramIndeedHandler(TelegramHandler):
filter_by_title=self.title_filters filter_by_title=self.title_filters
) )
self.logger.info(f"Found {len(jobs)} jobs") self.logger.info(f"Found {len(jobs)} jobs")
new_jobs = self.jobRepository.insertManyIfNotFound(jobs) new_jobs = self.jobRepository.insert_many_if_not_found(jobs)
for newJob in new_jobs: for newJob in new_jobs:
await self.telegramBot.send_job(newJob) await self.telegramBot.send_job(newJob)
self.logger.info("finished handling") self.logger.info("finished handling")