From cc782ddfc1758db251d8bebb630a0e105e1209f0 Mon Sep 17 00:00:00 2001 From: Yariv Menachem Date: Tue, 10 Dec 2024 19:29:34 +0200 Subject: [PATCH] aligned and added the telegram and chat id --- src/jobspy/__init__.py | 59 +++-------------------- src/main.py | 105 ++++++++++++++++++++++++++++++++++------- 2 files changed, 94 insertions(+), 70 deletions(-) diff --git a/src/jobspy/__init__.py b/src/jobspy/__init__.py index 4e4efec..c0a6a90 100644 --- a/src/jobspy/__init__.py +++ b/src/jobspy/__init__.py @@ -2,11 +2,9 @@ from __future__ import annotations from datetime import datetime import pandas as pd -from typing import List, Tuple +from typing import Tuple from concurrent.futures import ThreadPoolExecutor, as_completed -from pymongo import MongoClient, UpdateOne - from .jobs import JobPost, JobType, Location from .scrapers.utils import set_logger_level, extract_salary, create_logger from .scrapers.indeed import IndeedScraper @@ -22,14 +20,6 @@ from .scrapers.exceptions import ( GlassdoorException, GoogleJobsException, ) -# Connect to MongoDB server -client = MongoClient("mongodb://localhost:27017/") - -# Access a database (it will be created automatically if it doesn't exist) -db = client["jobs_database"] - -# Access a collection -jobs_collection = db["jobs"] def scrape_jobs( site_name: str | list[str] | Site | list[Site] | None = None, @@ -112,59 +102,23 @@ def scrape_jobs( offset=offset, hours_old=hours_old, ) - - # def insert_jobs(jobs: List[JobPost], collection): - # # Convert JobPost objects to dictionaries - # # job_dicts = [job.model_dump() for job in jobs] - # job_dicts = [job.model_dump(exclude={"date_posted"}) for job in jobs] - # collection.insert_many(job_dicts) - # print(f"Inserted {len(job_dicts)} jobs into MongoDB.") - def insert_jobs(jobs: List[JobPost], collection): - """ - Perform bulk upserts for a list of JobPost objects into a MongoDB collection. - Only insert new jobs and return the list of newly inserted jobs. - """ - operations = [] - new_jobs = [] # List to store the new jobs inserted into MongoDB - - for job in jobs: - job_dict = job.model_dump(exclude={"date_posted"}) - operations.append( - UpdateOne( - {"id": job.id}, # Match by `id` - {"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated) - upsert=True # Insert if not found, but do not update if already exists - ) - ) - - if operations: - # Execute all operations in bulk - result = collection.bulk_write(operations) - print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}") - - # Get the newly inserted jobs (those that were upserted) - # The `upserted_count` corresponds to how many new documents were inserted - for i, job in enumerate(jobs): - if result.upserted_count > 0 and i < result.upserted_count: - new_jobs.append(job) - print(f"New Job ID: {job.id}, Label: {job.label}") - - return new_jobs def scrape_site(site: Site) -> Tuple[str, JobResponse]: scraper_class = SCRAPER_MAPPING[site] scraper = scraper_class(proxies=proxies, ca_cert=ca_cert) scraped_data: JobResponse = scraper.scrape(scraper_input) - insert_jobs(scraped_data.jobs, jobs_collection) cap_name = site.value.capitalize() site_name = "ZipRecruiter" if cap_name == "Zip_recruiter" else cap_name create_logger(site_name).info(f"finished scraping") return site.value, scraped_data site_to_jobs_dict = {} - + merged_jobs:list[JobPost] = [] def worker(site): site_val, scraped_info = scrape_site(site) + # Add the scraped jobs to the merged list + merged_jobs.extend(scraped_info.jobs) # Assuming scraped_info has 'jobs' as a list + return site_val, scraped_info with ThreadPoolExecutor() as executor: @@ -175,7 +129,8 @@ def scrape_jobs( for future in as_completed(future_to_site): site_value, scraped_data = future.result() site_to_jobs_dict[site_value] = scraped_data - + + return merged_jobs def convert_to_annual(job_data: dict): if job_data["interval"] == "hourly": job_data["min_amount"] *= 2080 diff --git a/src/main.py b/src/main.py index 59fa7c8..a812053 100644 --- a/src/main.py +++ b/src/main.py @@ -1,22 +1,91 @@ +import asyncio import csv +from typing import List -from jobspy import scrape_jobs +from pymongo import MongoClient, UpdateOne +from telegram import Bot -jobs = scrape_jobs( - # site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"], - site_name=["glassdoor"], - search_term="software engineer", - google_search_term="software engineer jobs near Tel Aviv Israel since yesterday", - location="Central, Israel", - locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"], - results_wanted=200, - hours_old=200, - country_indeed='israel', - - # linkedin_fetch_description=True # gets more info such as description, direct job url (slower) - # proxies=["208.195.175.46:65095", "208.195.175.45:65095", "localhost"], -) -print(f"Found {len(jobs)} jobs") +from jobspy import scrape_jobs +from jobspy.jobs import JobPost +TELEGRAM_API_TOKEN = +CHAT_ID = +# Connect to MongoDB server +client = MongoClient("mongodb://localhost:27017/") +# Access a database (it will be created automatically if it doesn't exist) +db = client["jobs_database"] +# Access a collection +jobs_collection = db["jobs"] +# Initialize the Telegram bot +bot = Bot(token=TELEGRAM_API_TOKEN) -# print(jobs.head()) -# jobs.to_csv("jobs.csv", quoting=csv.QUOTE_NONNUMERIC, escapechar="\\", index=False) # to_excel \ No newline at end of file +async def send_job_to_telegram(job:JobPost): + """ + Send job details to Telegram chat. + """ + message = f"New Job Posted:\n\n" \ + f"Job ID: {job.id}\n" \ + f"Job Title: {job.title}\n" \ + f"Company: {job.company_name}\n" \ + f"Location: {job.location}\n" \ + f"Link: {job.job_url}\n" + try: + await bot.sendMessage(chat_id=CHAT_ID, text=message) + print(f"Sent job to Telegram: {job.id}") + except Exception as e: + print(f"Failed to send job to Telegram: {e}") + +def insert_jobs(jobs: List[JobPost], collection): + """ + Perform bulk upserts for a list of JobPost objects into a MongoDB collection. + Only insert new jobs and return the list of newly inserted jobs. + """ + operations = [] + new_jobs = [] # List to store the new jobs inserted into MongoDB + + for job in jobs: + job_dict = job.model_dump(exclude={"date_posted"}) + operations.append( + UpdateOne( + {"id": job.id}, # Match by `id` + {"$setOnInsert": job_dict}, # Only set fields if the job is being inserted (not updated) + upsert=True # Insert if not found, but do not update if already exists + ) + ) + + if operations: + # Execute all operations in bulk + result = collection.bulk_write(operations) + print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}, Modified: {result.modified_count}") + + # Get the newly inserted jobs (those that were upserted) + # The `upserted_count` corresponds to how many new documents were inserted + for i, job in enumerate(jobs): + if result.upserted_count > 0 and i < result.upserted_count: + new_jobs.append(job) + print(f"New Job ID: {job.id}, Label: {job.title}") + + return new_jobs + +async def main(): + + jobs = scrape_jobs( + # site_name=["indeed", "linkedin", "zip_recruiter", "glassdoor", "google"], + site_name=["glassdoor"], + search_term="software engineer", + google_search_term="software engineer jobs near Tel Aviv Israel since yesterday", + location="Central, Israel", + locations=["Ramat Gan, Israel"], + # locations=["Tel Aviv, Israel","Ramat Gan, Israel","Central, Israel","Rehovot ,Israel"], + results_wanted=50, + hours_old=200, + country_indeed='israel', + ) + print(f"Found {len(jobs)} jobs") + + new_jobs = insert_jobs(jobs, jobs_collection) + + for new_job in new_jobs: + await send_job_to_telegram(new_job) + # Run the async main function +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file