diff --git a/src/jobspy/db/__init__.py b/src/jobspy/db/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/jobspy/db/job_repository.py b/src/jobspy/db/job_repository.py new file mode 100644 index 0000000..a8b2cf2 --- /dev/null +++ b/src/jobspy/db/job_repository.py @@ -0,0 +1,69 @@ +from http import client +import os +from typing import List +from pymongo import MongoClient, UpdateOne + +from jobspy.jobs import JobPost + + +class JobRepository: + + def __init__(self): + self.mongoUri = os.getenv("MONGO_URI") + # Connect to MongoDB server + self.client = MongoClient(self.mongoUri) + # Access a database (it will be created automatically if it doesn't exist) + self.db = client["jobs_database"] + # Access a collection + self.collection = self.db["jobs"] + + def insert_or_update_job(self, job: JobPost): + # Convert JobPost to dictionary + job_dict = job.model_dump(exclude={"date_posted"}) + + # Check if the job already exists by its ID + if job.id: + # If it exists, update the `updated_at` field and other fields + # job_dict['updated_at'] = datetime.utcnow() # Set updated time to current time + self.collection.update_one( + {'_id': job.id}, + {'$set': job_dict} + ) + print(f"Updated job with ID {job.id}.") + else: + # If it doesn't exist, insert a new job with the current `created_at` and `updated_at` + self.collection.insert_one(job_dict) + print(f"Inserted new job with title {job.title}.") + + def insertManyIfNotFound(self, jobs: List[JobPost]) -> List[JobPost]: + """ + Perform bulk upserts for a list of JobPost objects into a MongoDB collection. + Only insert new jobs and return the list of newly inserted jobs. + """ + operations = [] + new_jobs = [] # List to store the new jobs inserted into MongoDB + for job in jobs: + job_dict = job.model_dump(exclude={"date_posted"}) + operations.append( + UpdateOne( + {"id": job.id}, # Match by `id` + # Only set fields if the job is being inserted (not updated) + {"$setOnInsert": job_dict}, + upsert=True # Insert if not found, but do not update if already exists + ) + ) + + if operations: + # Execute all operations in bulk + result = self.collection.bulk_write(operations) + print(f"Matched: {result.matched_count}, Upserts: { + result.upserted_count}, Modified: {result.modified_count}") + + # Get the newly inserted jobs (those that were upserted) + # The `upserted_count` corresponds to how many new documents were inserted + for i, job in enumerate(jobs): + if result.upserted_count > 0 and i < result.upserted_count: + new_jobs.append(job) + print(f"New Job ID: {job.id}, Label: {job.title}") + + return new_jobs diff --git a/src/jobspy/main.py b/src/jobspy/main.py new file mode 100644 index 0000000..4af0723 --- /dev/null +++ b/src/jobspy/main.py @@ -0,0 +1,34 @@ +import asyncio +from db.job_repository import JobRepository +from jobspy import scrape_jobs +from jobspy.telegram_bot import TelegramBot + + +async def main(): + telegramBot = TelegramBot() + jobRepository = JobRepository() + + jobs = scrape_jobs( + # site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"], + site_name=["linkedin"], + search_term="software engineer", + google_search_term="software engineer jobs near Tel Aviv Israel since yesterday", + location="Central, Israel", + locations=["Rehovot"], + # locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"], + results_wanted=5, + hours_old=200, + country_indeed='israel', + ) + print(f"Found {len(jobs)} jobs") + + for job in jobs: + jobRepository.insert_or_update_job(job) + + # new_jobs = jobRepository.insertManyIfNotFound(jobs, jobs_collection) + + # for new_job in new_jobs: + # await telegramBot.send_job(new_job) + # Run the async main function +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/jobspy/scrapers/linkedin/__init__.py b/src/jobspy/scrapers/linkedin/__init__.py index c3629f6..5c3174d 100644 --- a/src/jobspy/scrapers/linkedin/__init__.py +++ b/src/jobspy/scrapers/linkedin/__init__.py @@ -91,7 +91,7 @@ class LinkedInScraper(Scraper): ) params = { "keywords": scraper_input.search_term, - "location": scraper_input.location, + "location": ",".join(scraper_input.locations), "distance": scraper_input.distance, "f_WT": 2 if scraper_input.is_remote else None, "f_JT": ( @@ -224,6 +224,7 @@ class LinkedInScraper(Scraper): company_url=company_url, location=location, date_posted=date_posted, + datetime_posted=date_posted, job_url=f"{self.base_url}/jobs/view/{job_id}", compensation=compensation, job_type=job_details.get("job_type"), diff --git a/src/jobspy/telegram_bot.py b/src/jobspy/telegram_bot.py new file mode 100644 index 0000000..60b19a7 --- /dev/null +++ b/src/jobspy/telegram_bot.py @@ -0,0 +1,31 @@ +import os +from dotenv import load_dotenv +from telegram import Bot + +from jobspy.jobs import JobPost + +load_dotenv() + + +class TelegramBot: + + def __init__(self): + self._api_token = os.getenv("TELEGRAM_API_TOKEN") + self.chatId = os.getenv("TELEGRAM_CHAT_ID") + self.bot = Bot(token=self._api_token) + + async def send_job(self, job: JobPost): + """ + Send JobPost details to Telegram chat. + """ + message = f"New Job Posted:\n\n" \ + f"Job ID: {job.id}\n" \ + f"Job Title: {job.title}\n" \ + f"Company: {job.company_name}\n" \ + f"Location: {job.location}\n" \ + f"Link: {job.job_url}\n" + try: + await self.bot.sendMessage(chat_id=self.chatId, text=message) + print(f"Sent job to Telegram: {job.id}") + except Exception as e: + print(f"Failed to send job to Telegram: {e}") diff --git a/src/main.py b/src/main.py deleted file mode 100644 index 477e003..0000000 --- a/src/main.py +++ /dev/null @@ -1,96 +0,0 @@ -import asyncio -import os -from dotenv import load_dotenv -from typing import List - -from pymongo import MongoClient, UpdateOne -from telegram import Bot - -from jobspy import scrape_jobs -from jobspy.jobs import JobPost -# Load the .env file -load_dotenv() - -TELEGRAM_API_TOKEN = os.getenv("TELEGRAM_API_TOKEN") -CHAT_ID = os.getenv("TELEGRAM_CHAT_ID") -MONGO_URI = os.getenv("MONGO_URI") -# Connect to MongoDB server -client = MongoClient(MONGO_URI) -# Access a database (it will be created automatically if it doesn't exist) -db = client["jobs_database"] -# Access a collection -jobs_collection = db["jobs"] -# Initialize the Telegram bot -bot = Bot(token=TELEGRAM_API_TOKEN) - -async def send_job_to_telegram(job:JobPost): - """ - Send job details to Telegram chat. - """ - message = f"New Job Posted:\n\n" \ - f"Job ID: {job.id}\n" \ - f"Job Title: {job.title}\n" \ - f"Company: {job.company_name}\n" \ - f"Location: {job.location}\n" \ - f"Link: {job.job_url}\n" - try: - await bot.sendMessage(chat_id=CHAT_ID, text=message) - print(f"Sent job to Telegram: {job.id}") - except Exception as e: - print(f"Failed to send job to Telegram: {e}") - -def insert_jobs(jobs: List[JobPost], collection): - """ - Perform bulk upserts for a list of JobPost objects into a MongoDB collection. - Only insert new jobs and return the list of newly inserted jobs. - """ - operations = [] - new_jobs = [] # List to store the new jobs inserted into MongoDB - - for job in jobs: - job_dict = job.model_dump(exclude={"date_posted"}) - operations.append( - UpdateOne( - {"id": job.id}, # Match by `id` - {"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated) - upsert=True # Insert if not found, but do not update if already exists - ) - ) - - if operations: - # Execute all operations in bulk - result = collection.bulk_write(operations) - print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}") - - # Get the newly inserted jobs (those that were upserted) - # The `upserted_count` corresponds to how many new documents were inserted - for i, job in enumerate(jobs): - if result.upserted_count > 0 and i < result.upserted_count: - new_jobs.append(job) - print(f"New Job ID: {job.id}, Label: {job.title}") - - return new_jobs - -async def main(): - - jobs = scrape_jobs( - # site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"], - site_name=["glassdoor"], - search_term="software engineer", - google_search_term="software engineer jobs near Tel Aviv Israel since yesterday", - location="Central, Israel", - locations=["Ramat Gan, Israel"], - # locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"], - results_wanted=50, - hours_old=200, - country_indeed='israel', - ) - print(f"Found {len(jobs)} jobs") - - new_jobs = insert_jobs(jobs, jobs_collection) - - for new_job in new_jobs: - await send_job_to_telegram(new_job) - # Run the async main function -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/tests/test_db.py b/tests/test_db.py new file mode 100644 index 0000000..1e40fca --- /dev/null +++ b/tests/test_db.py @@ -0,0 +1,8 @@ +from jobspy.db.job_repository import JobRepository +from tests.test_util import createMockJob + + +def insert_job(): + jobRepository = JobRepository() + job = createMockJob() + jobRepository.insert_or_update_job(job) diff --git a/tests/test_linkedin.py b/tests/test_linkedin.py index 080f4b8..0cb5ec4 100644 --- a/tests/test_linkedin.py +++ b/tests/test_linkedin.py @@ -3,7 +3,8 @@ import pandas as pd def test_linkedin(): - result = scrape_jobs(site_name="linkedin", search_term="engineer", results_wanted=5) + result = scrape_jobs(site_name="linkedin", + search_term="engineer", results_wanted=5) assert ( isinstance(result, pd.DataFrame) and len(result) == 5 ), "Result should be a non-empty DataFrame" diff --git a/tests/test_util.py b/tests/test_util.py new file mode 100644 index 0000000..74b33f1 --- /dev/null +++ b/tests/test_util.py @@ -0,0 +1,88 @@ +from datetime import datetime, date +from typing import List + +from telegram import Location + +from jobspy.jobs import Country, JobPost +# Creating some test job posts + + +def createMockJob() -> JobPost: + return JobPost( + id='li-4072458658', + title='Backend Developer', + company_name='Okoora', + job_url='https://www.linkedin.com/jobs/view/4072458658', + location=Location(country=Country.ISRAEL, + city='Ramat Gan', state='Tel Aviv District'), + description=None, + company_url='https://ch.linkedin.com/company/okoora', + date_posted=date(2024, 12, 9), + datetime_posted=datetime(2024, 12, 9) + ) + + +def createMockJob2() -> JobPost: + return JobPost( + id='li-4093541744', + title='Software Engineer', + company_name='Hyro', + job_url='https://www.linkedin.com/jobs/view/4093541744', + location=Location(country=Country.ISRAEL, + city='Tel Aviv-Yafo', state='Tel Aviv District'), + description=None, + company_url='https://www.linkedin.com/company/hyroai', + date_posted=date(2024, 12, 8), + datetime_posted=datetime(2024, 12, 8) + ) + + +def createMockJob3() -> JobPost: + return JobPost( + id='li-4090995419', + title='Frontend Developer', + company_name='Balance', + job_url='https://www.linkedin.com/jobs/view/4090995419', + location=Location(country=Country.WORLDWIDE, + city='Tel Aviv District', state='Israel'), + description=None, + company_url='https://www.linkedin.com/company/getbalance', + date_posted=date(2024, 12, 5), + datetime_posted=datetime(2024, 12, 5) + ) + + +def createMockJob4() -> JobPost: + return JobPost( + id='li-4090533760', + title='Backend Developer', + company_name='Vi', + job_url='https://www.linkedin.com/jobs/view/4090533760', + location=Location(country=Country.ISRAEL, + city='Tel Aviv-Yafo', state='Tel Aviv District'), + description=None, + company_url='https://www.linkedin.com/company/vi', + date_posted=date(2024, 12, 3), + datetime_posted=datetime(2024, 12, 3) + ) + + +def createMockJob5() -> JobPost: + return JobPost( + id='li-4074568220', + title='Backend .NET Developer', + company_name='Just Eat Takeaway.com', + job_url='https://www.linkedin.com/jobs/view/4074568220', + location=Location(country=Country.WORLDWIDE, + city='Tel Aviv District', state='Israel'), + description=None, + company_url='https://nl.linkedin.com/company/just-eat-takeaway-com', + date_posted=date(2024, 12, 6), + datetime_posted=datetime(2024, 12, 6) + ) + + +def createMockjobs() -> List[JobPost]: + + return [createMockJob(), createMockJob2(), createMockJob3(), + createMockJob4(), createMockJob5()]