file structure fixed, exported telegram bot and jon collection to another class

pull/231/head
Yariv Menachem 2024-12-11 16:26:23 +02:00
parent c6a7729481
commit f4ebcc2d51
9 changed files with 234 additions and 98 deletions

View File

View File

@ -0,0 +1,69 @@
from http import client
import os
from typing import List
from pymongo import MongoClient, UpdateOne
from jobspy.jobs import JobPost
class JobRepository:
def __init__(self):
self.mongoUri = os.getenv("MONGO_URI")
# Connect to MongoDB server
self.client = MongoClient(self.mongoUri)
# Access a database (it will be created automatically if it doesn't exist)
self.db = client["jobs_database"]
# Access a collection
self.collection = self.db["jobs"]
def insert_or_update_job(self, job: JobPost):
# Convert JobPost to dictionary
job_dict = job.model_dump(exclude={"date_posted"})
# Check if the job already exists by its ID
if job.id:
# If it exists, update the `updated_at` field and other fields
# job_dict['updated_at'] = datetime.utcnow() # Set updated time to current time
self.collection.update_one(
{'_id': job.id},
{'$set': job_dict}
)
print(f"Updated job with ID {job.id}.")
else:
# If it doesn't exist, insert a new job with the current `created_at` and `updated_at`
self.collection.insert_one(job_dict)
print(f"Inserted new job with title {job.title}.")
def insertManyIfNotFound(self, jobs: List[JobPost]) -> List[JobPost]:
"""
Perform bulk upserts for a list of JobPost objects into a MongoDB collection.
Only insert new jobs and return the list of newly inserted jobs.
"""
operations = []
new_jobs = [] # List to store the new jobs inserted into MongoDB
for job in jobs:
job_dict = job.model_dump(exclude={"date_posted"})
operations.append(
UpdateOne(
{"id": job.id}, # Match by `id`
# Only set fields if the job is being inserted (not updated)
{"$setOnInsert": job_dict},
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = self.collection.bulk_write(operations)
print(f"Matched: {result.matched_count}, Upserts: {
result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted jobs (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, job in enumerate(jobs):
if result.upserted_count > 0 and i < result.upserted_count:
new_jobs.append(job)
print(f"New Job ID: {job.id}, Label: {job.title}")
return new_jobs

34
src/jobspy/main.py Normal file
View File

@ -0,0 +1,34 @@
import asyncio
from db.job_repository import JobRepository
from jobspy import scrape_jobs
from jobspy.telegram_bot import TelegramBot
async def main():
telegramBot = TelegramBot()
jobRepository = JobRepository()
jobs = scrape_jobs(
# site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"],
site_name=["linkedin"],
search_term="software engineer",
google_search_term="software engineer jobs near Tel Aviv Israel since yesterday",
location="Central, Israel",
locations=["Rehovot"],
# locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"],
results_wanted=5,
hours_old=200,
country_indeed='israel',
)
print(f"Found {len(jobs)} jobs")
for job in jobs:
jobRepository.insert_or_update_job(job)
# new_jobs = jobRepository.insertManyIfNotFound(jobs, jobs_collection)
# for new_job in new_jobs:
# await telegramBot.send_job(new_job)
# Run the async main function
if __name__ == "__main__":
asyncio.run(main())

View File

@ -91,7 +91,7 @@ class LinkedInScraper(Scraper):
) )
params = { params = {
"keywords": scraper_input.search_term, "keywords": scraper_input.search_term,
"location": scraper_input.location, "location": ",".join(scraper_input.locations),
"distance": scraper_input.distance, "distance": scraper_input.distance,
"f_WT": 2 if scraper_input.is_remote else None, "f_WT": 2 if scraper_input.is_remote else None,
"f_JT": ( "f_JT": (
@ -224,6 +224,7 @@ class LinkedInScraper(Scraper):
company_url=company_url, company_url=company_url,
location=location, location=location,
date_posted=date_posted, date_posted=date_posted,
datetime_posted=date_posted,
job_url=f"{self.base_url}/jobs/view/{job_id}", job_url=f"{self.base_url}/jobs/view/{job_id}",
compensation=compensation, compensation=compensation,
job_type=job_details.get("job_type"), job_type=job_details.get("job_type"),

View File

@ -0,0 +1,31 @@
import os
from dotenv import load_dotenv
from telegram import Bot
from jobspy.jobs import JobPost
load_dotenv()
class TelegramBot:
def __init__(self):
self._api_token = os.getenv("TELEGRAM_API_TOKEN")
self.chatId = os.getenv("TELEGRAM_CHAT_ID")
self.bot = Bot(token=self._api_token)
async def send_job(self, job: JobPost):
"""
Send JobPost details to Telegram chat.
"""
message = f"New Job Posted:\n\n" \
f"Job ID: {job.id}\n" \
f"Job Title: {job.title}\n" \
f"Company: {job.company_name}\n" \
f"Location: {job.location}\n" \
f"Link: {job.job_url}\n"
try:
await self.bot.sendMessage(chat_id=self.chatId, text=message)
print(f"Sent job to Telegram: {job.id}")
except Exception as e:
print(f"Failed to send job to Telegram: {e}")

View File

@ -1,96 +0,0 @@
import asyncio
import os
from dotenv import load_dotenv
from typing import List
from pymongo import MongoClient, UpdateOne
from telegram import Bot
from jobspy import scrape_jobs
from jobspy.jobs import JobPost
# Load the .env file
load_dotenv()
TELEGRAM_API_TOKEN = os.getenv("TELEGRAM_API_TOKEN")
CHAT_ID = os.getenv("TELEGRAM_CHAT_ID")
MONGO_URI = os.getenv("MONGO_URI")
# Connect to MongoDB server
client = MongoClient(MONGO_URI)
# Access a database (it will be created automatically if it doesn't exist)
db = client["jobs_database"]
# Access a collection
jobs_collection = db["jobs"]
# Initialize the Telegram bot
bot = Bot(token=TELEGRAM_API_TOKEN)
async def send_job_to_telegram(job:JobPost):
"""
Send job details to Telegram chat.
"""
message = f"New Job Posted:\n\n" \
f"Job ID: {job.id}\n" \
f"Job Title: {job.title}\n" \
f"Company: {job.company_name}\n" \
f"Location: {job.location}\n" \
f"Link: {job.job_url}\n"
try:
await bot.sendMessage(chat_id=CHAT_ID, text=message)
print(f"Sent job to Telegram: {job.id}")
except Exception as e:
print(f"Failed to send job to Telegram: {e}")
def insert_jobs(jobs: List[JobPost], collection):
"""
Perform bulk upserts for a list of JobPost objects into a MongoDB collection.
Only insert new jobs and return the list of newly inserted jobs.
"""
operations = []
new_jobs = [] # List to store the new jobs inserted into MongoDB
for job in jobs:
job_dict = job.model_dump(exclude={"date_posted"})
operations.append(
UpdateOne(
{"id": job.id}, # Match by `id`
{"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated)
upsert=True # Insert if not found, but do not update if already exists
)
)
if operations:
# Execute all operations in bulk
result = collection.bulk_write(operations)
print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}")
# Get the newly inserted jobs (those that were upserted)
# The `upserted_count` corresponds to how many new documents were inserted
for i, job in enumerate(jobs):
if result.upserted_count > 0 and i < result.upserted_count:
new_jobs.append(job)
print(f"New Job ID: {job.id}, Label: {job.title}")
return new_jobs
async def main():
jobs = scrape_jobs(
# site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"],
site_name=["glassdoor"],
search_term="software engineer",
google_search_term="software engineer jobs near Tel Aviv Israel since yesterday",
location="Central, Israel",
locations=["Ramat Gan, Israel"],
# locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"],
results_wanted=50,
hours_old=200,
country_indeed='israel',
)
print(f"Found {len(jobs)} jobs")
new_jobs = insert_jobs(jobs, jobs_collection)
for new_job in new_jobs:
await send_job_to_telegram(new_job)
# Run the async main function
if __name__ == "__main__":
asyncio.run(main())

8
tests/test_db.py Normal file
View File

@ -0,0 +1,8 @@
from jobspy.db.job_repository import JobRepository
from tests.test_util import createMockJob
def insert_job():
jobRepository = JobRepository()
job = createMockJob()
jobRepository.insert_or_update_job(job)

View File

@ -3,7 +3,8 @@ import pandas as pd
def test_linkedin(): def test_linkedin():
result = scrape_jobs(site_name="linkedin", search_term="engineer", results_wanted=5) result = scrape_jobs(site_name="linkedin",
search_term="engineer", results_wanted=5)
assert ( assert (
isinstance(result, pd.DataFrame) and len(result) == 5 isinstance(result, pd.DataFrame) and len(result) == 5
), "Result should be a non-empty DataFrame" ), "Result should be a non-empty DataFrame"

88
tests/test_util.py Normal file
View File

@ -0,0 +1,88 @@
from datetime import datetime, date
from typing import List
from telegram import Location
from jobspy.jobs import Country, JobPost
# Creating some test job posts
def createMockJob() -> JobPost:
return JobPost(
id='li-4072458658',
title='Backend Developer',
company_name='Okoora',
job_url='https://www.linkedin.com/jobs/view/4072458658',
location=Location(country=Country.ISRAEL,
city='Ramat Gan', state='Tel Aviv District'),
description=None,
company_url='https://ch.linkedin.com/company/okoora',
date_posted=date(2024, 12, 9),
datetime_posted=datetime(2024, 12, 9)
)
def createMockJob2() -> JobPost:
return JobPost(
id='li-4093541744',
title='Software Engineer',
company_name='Hyro',
job_url='https://www.linkedin.com/jobs/view/4093541744',
location=Location(country=Country.ISRAEL,
city='Tel Aviv-Yafo', state='Tel Aviv District'),
description=None,
company_url='https://www.linkedin.com/company/hyroai',
date_posted=date(2024, 12, 8),
datetime_posted=datetime(2024, 12, 8)
)
def createMockJob3() -> JobPost:
return JobPost(
id='li-4090995419',
title='Frontend Developer',
company_name='Balance',
job_url='https://www.linkedin.com/jobs/view/4090995419',
location=Location(country=Country.WORLDWIDE,
city='Tel Aviv District', state='Israel'),
description=None,
company_url='https://www.linkedin.com/company/getbalance',
date_posted=date(2024, 12, 5),
datetime_posted=datetime(2024, 12, 5)
)
def createMockJob4() -> JobPost:
return JobPost(
id='li-4090533760',
title='Backend Developer',
company_name='Vi',
job_url='https://www.linkedin.com/jobs/view/4090533760',
location=Location(country=Country.ISRAEL,
city='Tel Aviv-Yafo', state='Tel Aviv District'),
description=None,
company_url='https://www.linkedin.com/company/vi',
date_posted=date(2024, 12, 3),
datetime_posted=datetime(2024, 12, 3)
)
def createMockJob5() -> JobPost:
return JobPost(
id='li-4074568220',
title='Backend .NET Developer',
company_name='Just Eat Takeaway.com',
job_url='https://www.linkedin.com/jobs/view/4074568220',
location=Location(country=Country.WORLDWIDE,
city='Tel Aviv District', state='Israel'),
description=None,
company_url='https://nl.linkedin.com/company/just-eat-takeaway-com',
date_posted=date(2024, 12, 6),
datetime_posted=datetime(2024, 12, 6)
)
def createMockjobs() -> List[JobPost]:
return [createMockJob(), createMockJob2(), createMockJob3(),
createMockJob4(), createMockJob5()]